diff --git a/glide.lock b/glide.lock index 09f9ad2b..31f1aaa9 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 223d8e42a118e7861cb673ea58a035e99d3a98c94e4b71fb52998d320f9c3b49 -updated: 2017-11-25T22:00:24.612202481-08:00 +hash: e279cca35a5cc9a68bb266015dc6a57da749b28dabca3994b2c5dbe02309f470 +updated: 2017-11-28T00:53:04.816567531Z imports: - name: github.com/btcsuite/btcd version: 8cea3866d0f7fb12d567a20744942c0d078c7d15 diff --git a/node/node.go b/node/node.go index 5efe39b9..fff550bf 100644 --- a/node/node.go +++ b/node/node.go @@ -299,7 +299,7 @@ func NewNode(config *cfg.Config, for event := range ch { // XXX: may be not perfomant to write one event at a time txResult := event.(types.TMEventData).Unwrap().(types.EventDataTx).TxResult - txIndexer.Index(&txResult) + txIndexer.Index(&txResult, strings.Split(config.TxIndex.IndexTags, ",")) } }() diff --git a/state/txindex/indexer.go b/state/txindex/indexer.go index 2c37283c..f9908f32 100644 --- a/state/txindex/indexer.go +++ b/state/txindex/indexer.go @@ -10,10 +10,10 @@ import ( type TxIndexer interface { // AddBatch analyzes, indexes and stores a batch of transactions. - AddBatch(b *Batch) error + AddBatch(b *Batch, allowedTags []string) error // Index analyzes, indexes and stores a single transaction. - Index(result *types.TxResult) error + Index(result *types.TxResult, allowedTags []string) error // Get returns the transaction specified by hash or nil if the transaction is not indexed // or stored. @@ -26,18 +26,18 @@ type TxIndexer interface { // Batch groups together multiple Index operations to be performed at the same time. // NOTE: Batch is NOT thread-safe and must not be modified after starting its execution. type Batch struct { - Ops []types.TxResult + Ops []*types.TxResult } // NewBatch creates a new Batch. func NewBatch(n int) *Batch { return &Batch{ - Ops: make([]types.TxResult, n), + Ops: make([]*types.TxResult, n), } } // Add or update an entry for the given result.Index. -func (b *Batch) Add(result types.TxResult) error { +func (b *Batch) Add(result *types.TxResult) error { b.Ops[result.Index] = result return nil } diff --git a/state/txindex/kv/kv.go b/state/txindex/kv/kv.go index a3826c8b..ee81674b 100644 --- a/state/txindex/kv/kv.go +++ b/state/txindex/kv/kv.go @@ -2,16 +2,24 @@ package kv import ( "bytes" + "encoding/hex" "fmt" + "strconv" + "strings" + "time" + "github.com/pkg/errors" + + abci "github.com/tendermint/abci/types" wire "github.com/tendermint/go-wire" - - db "github.com/tendermint/tmlibs/db" - "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" + db "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/pubsub/query" ) +var _ txindex.TxIndexer = (*TxIndex)(nil) + // TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB). // It can only index transaction by its identifier. type TxIndex struct { @@ -46,20 +54,322 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { return txResult, nil } -// AddBatch writes a batch of transactions into the TxIndex storage. -func (txi *TxIndex) AddBatch(b *txindex.Batch) error { +// AddBatch indexes a batch of transactions using the given list of tags. +func (txi *TxIndex) AddBatch(b *txindex.Batch, allowedTags []string) error { storeBatch := txi.store.NewBatch() + for _, result := range b.Ops { - rawBytes := wire.BinaryBytes(&result) - storeBatch.Set(result.Tx.Hash(), rawBytes) + hash := result.Tx.Hash() + + // index tx by tags + for _, tag := range result.Result.Tags { + if stringInSlice(tag.Key, allowedTags) { + storeBatch.Set(keyForTag(tag, result), hash) + } + } + + // index tx by hash + rawBytes := wire.BinaryBytes(result) + storeBatch.Set(hash, rawBytes) } + storeBatch.Write() return nil } -// Index writes a single transaction into the TxIndex storage. -func (txi *TxIndex) Index(result *types.TxResult) error { - rawBytes := wire.BinaryBytes(result) - txi.store.Set(result.Tx.Hash(), rawBytes) - return nil +// Index indexes a single transaction using the given list of tags. +func (txi *TxIndex) Index(result *types.TxResult, allowedTags []string) error { + batch := txindex.NewBatch(1) + batch.Add(result) + return txi.AddBatch(batch, allowedTags) +} + +func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { + hashes := make(map[string][]byte) // key - (base 16, upper-case hash) + + // get a list of conditions (like "tx.height > 5") + conditions := q.Conditions() + + // if there is a hash condition, return the result immediately + hash, err, ok := lookForHash(conditions) + if err != nil { + return []*types.TxResult{}, errors.Wrap(err, "error during searching for a hash in the query") + } else if ok { + res, err := txi.Get(hash) + return []*types.TxResult{res}, errors.Wrap(err, "error while retrieving the result") + } + + // conditions to skip + skipIndexes := make([]int, 0) + + // if there is a height condition ("tx.height=3"), extract it for faster lookups + height, heightIndex := lookForHeight(conditions) + if heightIndex >= 0 { + skipIndexes = append(skipIndexes, heightIndex) + } + + var hashes2 [][]byte + + // extract ranges + // if both upper and lower bounds exist, it's better to get them in order not + // no iterate over kvs that are not within range. + ranges, rangeIndexes := lookForRanges(conditions) + if len(ranges) > 0 { + skipIndexes = append(skipIndexes, rangeIndexes...) + } + for _, r := range ranges { + hashes2 = txi.matchRange(r, startKeyForRange(r, height, heightIndex > 0)) + + // initialize hashes if we're running the first time + if len(hashes) == 0 { + for _, h := range hashes2 { + hashes[hashKey(h)] = h + } + continue + } + + // no matches + if len(hashes2) == 0 { + hashes = make(map[string][]byte) + } else { + // perform intersection as we go + for _, h := range hashes2 { + k := hashKey(h) + if _, ok := hashes[k]; !ok { + delete(hashes, k) + } + } + } + } + + // for all other conditions + for i, c := range conditions { + if intInSlice(i, skipIndexes) { + continue + } + + hashes2 = txi.match(c, startKey(c, height, heightIndex > 0)) + + // initialize hashes if we're running the first time + if len(hashes) == 0 { + for _, h := range hashes2 { + hashes[hashKey(h)] = h + } + continue + } + + // no matches + if len(hashes2) == 0 { + hashes = make(map[string][]byte) + } else { + // perform intersection as we go + for _, h := range hashes2 { + k := hashKey(h) + if _, ok := hashes[k]; !ok { + delete(hashes, k) + } + } + } + } + + results := make([]*types.TxResult, len(hashes)) + i := 0 + for _, h := range hashes { + results[i], err = txi.Get(h) + if err != nil { + return []*types.TxResult{}, errors.Wrapf(err, "failed to get Tx{%X}", h) + } + i++ + } + + return results, nil +} + +func lookForHash(conditions []query.Condition) (hash []byte, err error, ok bool) { + for _, c := range conditions { + if c.Tag == types.TxHashKey { + decoded, err := hex.DecodeString(c.Operand.(string)) + return decoded, err, true + } + } + return +} + +func lookForHeight(conditions []query.Condition) (height uint64, index int) { + for i, c := range conditions { + if c.Tag == types.TxHeightKey { + return uint64(c.Operand.(int64)), i + } + } + return 0, -1 +} + +type queryRanges map[string]queryRange + +type queryRange struct { + key string + lowerBound interface{} // int || time.Time + includeLowerBound bool + upperBound interface{} // int || time.Time + includeUpperBound bool +} + +func lookForRanges(conditions []query.Condition) (ranges queryRanges, indexes []int) { + ranges = make(queryRanges) + for i, c := range conditions { + if isRangeOperation(c.Op) { + r, ok := ranges[c.Tag] + if !ok { + r = queryRange{key: c.Tag} + } + switch c.Op { + case query.OpGreater: + r.lowerBound = c.Operand + case query.OpGreaterEqual: + r.includeLowerBound = true + r.lowerBound = c.Operand + case query.OpLess: + r.upperBound = c.Operand + case query.OpLessEqual: + r.includeUpperBound = true + r.upperBound = c.Operand + } + ranges[c.Tag] = r + indexes = append(indexes, i) + } + } + return ranges, indexes +} + +func isRangeOperation(op query.Operator) bool { + switch op { + case query.OpGreater, query.OpGreaterEqual, query.OpLess, query.OpLessEqual: + return true + default: + return false + } +} + +func (txi *TxIndex) match(c query.Condition, startKey []byte) (hashes [][]byte) { + if c.Op == query.OpEqual { + it := txi.store.IteratorPrefix(startKey) + for it.Next() { + hashes = append(hashes, it.Value()) + } + } else if c.Op == query.OpContains { + // XXX: full scan + it := txi.store.Iterator() + for it.Next() { + // if it is a hash key, continue + if !strings.Contains(string(it.Key()), "/") { + continue + } + if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) { + hashes = append(hashes, it.Value()) + } + } + } else { + panic("other operators should be handled already") + } + return +} + +func startKey(c query.Condition, height uint64, heightSpecified bool) []byte { + var key string + if heightSpecified { + key = fmt.Sprintf("%s/%v/%d", c.Tag, c.Operand, height) + } else { + key = fmt.Sprintf("%s/%v", c.Tag, c.Operand) + } + return []byte(key) +} + +func startKeyForRange(r queryRange, height uint64, heightSpecified bool) []byte { + var lowerBound interface{} + if r.includeLowerBound { + lowerBound = r.lowerBound + } else { + switch t := r.lowerBound.(type) { + case int64: + lowerBound = t + 1 + case time.Time: + lowerBound = t.Unix() + 1 + default: + panic("not implemented") + } + } + var key string + if heightSpecified { + key = fmt.Sprintf("%s/%v/%d", r.key, lowerBound, height) + } else { + key = fmt.Sprintf("%s/%v", r.key, lowerBound) + } + return []byte(key) +} + +func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) { + it := txi.store.IteratorPrefix(startKey) + defer it.Release() + for it.Next() { + // no other way to stop iterator other than checking for upperBound + switch (r.upperBound).(type) { + case int64: + v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) + if err == nil && v == r.upperBound { + if r.includeUpperBound { + hashes = append(hashes, it.Value()) + } + break + } + // XXX: passing time in a ABCI Tags is not yet implemented + // case time.Time: + // v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64) + // if v == r.upperBound { + // break + // } + } + hashes = append(hashes, it.Value()) + } + return +} + +func extractValueFromKey(key []byte) string { + s := string(key) + parts := strings.SplitN(s, "/", 3) + return parts[1] +} + +func keyForTag(tag *abci.KVPair, result *types.TxResult) []byte { + switch tag.ValueType { + case abci.KVPair_STRING: + return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueString, result.Height, result.Index)) + case abci.KVPair_INT: + return []byte(fmt.Sprintf("%s/%v/%d/%d", tag.Key, tag.ValueInt, result.Height, result.Index)) + // case abci.KVPair_TIME: + // return []byte(fmt.Sprintf("%s/%d/%d/%d", tag.Key, tag.ValueTime.Unix(), result.Height, result.Index)) + default: + panic(fmt.Sprintf("Undefined value type: %v", tag.ValueType)) + } +} + +func hashKey(hash []byte) string { + return fmt.Sprintf("%X", hash) +} + +func stringInSlice(a string, list []string) bool { + for _, b := range list { + if b == a { + return true + } + } + return false +} + +func intInSlice(a int, list []int) bool { + for _, b := range list { + if b == a { + return true + } + } + return false } diff --git a/state/txindex/kv/kv_test.go b/state/txindex/kv/kv_test.go index f814fabe..b1f9840e 100644 --- a/state/txindex/kv/kv_test.go +++ b/state/txindex/kv/kv_test.go @@ -1,6 +1,7 @@ package kv import ( + "fmt" "io/ioutil" "os" "testing" @@ -11,6 +12,7 @@ import ( "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" db "github.com/tendermint/tmlibs/db" + "github.com/tendermint/tmlibs/pubsub/query" ) func TestTxIndex(t *testing.T) { @@ -21,28 +23,89 @@ func TestTxIndex(t *testing.T) { hash := tx.Hash() batch := txindex.NewBatch(1) - if err := batch.Add(*txResult); err != nil { + if err := batch.Add(txResult); err != nil { t.Error(err) } - err := indexer.AddBatch(batch) - require.Nil(t, err) + err := indexer.AddBatch(batch, []string{}) + require.NoError(t, err) loadedTxResult, err := indexer.Get(hash) - require.Nil(t, err) + require.NoError(t, err) assert.Equal(t, txResult, loadedTxResult) tx2 := types.Tx("BYE BYE WORLD") txResult2 := &types.TxResult{1, 0, tx2, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} hash2 := tx2.Hash() - err = indexer.Index(txResult2) - require.Nil(t, err) + err = indexer.Index(txResult2, []string{}) + require.NoError(t, err) loadedTxResult2, err := indexer.Get(hash2) - require.Nil(t, err) + require.NoError(t, err) assert.Equal(t, txResult2, loadedTxResult2) } +func TestTxSearch(t *testing.T) { + indexer := &TxIndex{store: db.NewMemDB()} + + tx := types.Tx("HELLO WORLD") + tags := []*abci.KVPair{ + &abci.KVPair{Key: "account.number", ValueType: abci.KVPair_INT, ValueInt: 1}, + &abci.KVPair{Key: "account.owner", ValueType: abci.KVPair_STRING, ValueString: "Ivan"}, + &abci.KVPair{Key: "not_allowed", ValueType: abci.KVPair_STRING, ValueString: "Vlad"}, + } + txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: tags}} + hash := tx.Hash() + + allowedTags := []string{"account.number", "account.owner", "account.date"} + err := indexer.Index(txResult, allowedTags) + require.NoError(t, err) + + testCases := []struct { + q string + expectError bool + resultsLength int + results []*types.TxResult + }{ + // search by hash + {fmt.Sprintf("tx.hash = '%X'", hash), false, 1, []*types.TxResult{txResult}}, + // search by exact match (one tag) + {"account.number = 1", false, 1, []*types.TxResult{txResult}}, + // search by exact match (two tags) + {"account.number = 1 AND account.owner = 'Ivan'", false, 1, []*types.TxResult{txResult}}, + // search by exact match (two tags) + {"account.number = 1 AND account.owner = 'Vlad'", false, 0, []*types.TxResult{}}, + // search by range + {"account.number >= 1 AND account.number <= 5", false, 1, []*types.TxResult{txResult}}, + // search using not allowed tag + {"not_allowed = 'boom'", false, 0, []*types.TxResult{}}, + // search for not existing tx result + {"account.number >= 2 AND account.number <= 5", false, 0, []*types.TxResult{}}, + // search using not existing tag + {"account.date >= TIME 2013-05-03T14:45:00Z", false, 0, []*types.TxResult{}}, + // search using CONTAINS + {"account.owner CONTAINS 'an'", false, 1, []*types.TxResult{txResult}}, + // search using CONTAINS + {"account.owner CONTAINS 'Vlad'", false, 0, []*types.TxResult{}}, + } + + for _, tc := range testCases { + t.Run(tc.q, func(t *testing.T) { + results, err := indexer.Search(query.MustParse(tc.q)) + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + assert.Len(t, results, tc.resultsLength) + if tc.resultsLength > 0 { + assert.Equal(t, tc.results, results) + } + }) + } +} + func benchmarkTxIndex(txsCount int, b *testing.B) { tx := types.Tx("HELLO WORLD") txResult := &types.TxResult{1, 0, tx, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: "", Tags: []*abci.KVPair{}}} @@ -58,7 +121,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) { batch := txindex.NewBatch(txsCount) for i := 0; i < txsCount; i++ { - if err := batch.Add(*txResult); err != nil { + if err := batch.Add(txResult); err != nil { b.Fatal(err) } txResult.Index += 1 @@ -67,7 +130,7 @@ func benchmarkTxIndex(txsCount int, b *testing.B) { b.ResetTimer() for n := 0; n < b.N; n++ { - err = indexer.AddBatch(batch) + err = indexer.AddBatch(batch, []string{}) } if err != nil { b.Fatal(err) diff --git a/state/txindex/null/null.go b/state/txindex/null/null.go index 27e81d73..12f5eb91 100644 --- a/state/txindex/null/null.go +++ b/state/txindex/null/null.go @@ -5,8 +5,11 @@ import ( "github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/types" + "github.com/tendermint/tmlibs/pubsub/query" ) +var _ txindex.TxIndexer = (*TxIndex)(nil) + // TxIndex acts as a /dev/null. type TxIndex struct{} @@ -16,11 +19,15 @@ func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { } // AddBatch is a noop and always returns nil. -func (txi *TxIndex) AddBatch(batch *txindex.Batch) error { +func (txi *TxIndex) AddBatch(batch *txindex.Batch, allowedTags []string) error { return nil } // Index is a noop and always returns nil. -func (txi *TxIndex) Index(result *types.TxResult) error { +func (txi *TxIndex) Index(result *types.TxResult, allowedTags []string) error { return nil } + +func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) { + return []*types.TxResult{}, nil +} diff --git a/types/event_bus.go b/types/event_bus.go index 2e31489c..1a89ef29 100644 --- a/types/event_bus.go +++ b/types/event_bus.go @@ -116,6 +116,16 @@ func (b *EventBus) PublishEventTx(event EventDataTx) error { } tags[TxHashKey] = fmt.Sprintf("%X", event.Tx.Hash()) + if tag, ok := tags[TxHeightKey]; ok { + b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag) + } + tags[TxHeightKey] = event.Height + + if tag, ok := tags[TxIndexKey]; ok { + b.Logger.Error("Found predefined tag (value will be overwritten)", "tag", tag) + } + tags[TxIndexKey] = event.Index + b.pubsub.PublishWithTags(ctx, TMEventData{event}, tags) return nil } diff --git a/types/events.go b/types/events.go index ed972e93..10df2643 100644 --- a/types/events.go +++ b/types/events.go @@ -140,6 +140,12 @@ const ( // TxHashKey is a reserved key, used to specify transaction's hash. // see EventBus#PublishEventTx TxHashKey = "tx.hash" + // TxHeightKey is a reserved key, used to specify transaction block's height. + // see EventBus#PublishEventTx + TxHeightKey = "tx.height" + // TxIndexKey is a reserved key, used to specify transaction's index within the block. + // see EventBus#PublishEventTx + TxIndexKey = "tx.index" ) var (