mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-26 07:12:16 +00:00
txindexer: Refactor Tx Search Aggregation (#3851)
- Replace the previous intersect call, which was called at each query condition, with a map intersection. - Replace fmt.Sprintf with string() closes: #3076 Benchmarks ``` Old goos: darwin goarch: amd64 pkg: github.com/tendermint/tendermint/state/txindex/kv BenchmarkTxSearch-4 200 103641206 ns/op 7998416 B/op 71171 allocs/op PASS ok github.com/tendermint/tendermint/state/txindex/kv 26.019s New goos: darwin goarch: amd64 pkg: github.com/tendermint/tendermint/state/txindex/kv BenchmarkTxSearch-4 1000 38615024 ns/op 13515226 B/op 166460 allocs/op PASS ok github.com/tendermint/tendermint/state/txindex/kv 53.618s ``` ~62% performance improvement Commits: * Refactor tx search * Add pending changelog entry * Add tx search benchmarking * remove intermediate hashes list also reset timer in BenchmarkTxSearch and fix other benchmark * fix import * Add test cases * Fix searching * Replace fmt.Sprintf with string * Update state/txindex/kv/kv.go Co-Authored-By: Anton Kaliaev <anton.kalyaev@gmail.com> * Rename params * Cleanup * Check error in benchmarks
This commit is contained in:
parent
f3ab967a46
commit
14fa800773
@ -32,6 +32,7 @@ program](https://hackerone.com/tendermint).
|
|||||||
- [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection
|
- [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection
|
||||||
- [mempool] \#3826 Make `max_msg_bytes` configurable
|
- [mempool] \#3826 Make `max_msg_bytes` configurable
|
||||||
- [blockchain] \#3561 Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md)
|
- [blockchain] \#3561 Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md)
|
||||||
|
- [rpc] \#3076 Improve transaction search performance
|
||||||
|
|
||||||
### BUG FIXES:
|
### BUG FIXES:
|
||||||
|
|
||||||
|
@ -11,11 +11,12 @@ import (
|
|||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
dbm "github.com/tendermint/tm-db"
|
||||||
|
|
||||||
cmn "github.com/tendermint/tendermint/libs/common"
|
cmn "github.com/tendermint/tendermint/libs/common"
|
||||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||||
"github.com/tendermint/tendermint/state/txindex"
|
"github.com/tendermint/tendermint/state/txindex"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
dbm "github.com/tendermint/tm-db"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -163,8 +164,8 @@ func (txi *TxIndex) indexEvents(result *types.TxResult, hash []byte, store dbm.S
|
|||||||
// both lower and upper bounds, so we are not performing a full scan. Results
|
// both lower and upper bounds, so we are not performing a full scan. Results
|
||||||
// from querying indexes are then intersected and returned to the caller.
|
// from querying indexes are then intersected and returned to the caller.
|
||||||
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
|
func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
|
||||||
var hashes [][]byte
|
|
||||||
var hashesInitialized bool
|
var hashesInitialized bool
|
||||||
|
filteredHashes := make(map[string][]byte)
|
||||||
|
|
||||||
// get a list of conditions (like "tx.height > 5")
|
// get a list of conditions (like "tx.height > 5")
|
||||||
conditions := q.Conditions()
|
conditions := q.Conditions()
|
||||||
@ -193,10 +194,16 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
|
|||||||
|
|
||||||
for _, r := range ranges {
|
for _, r := range ranges {
|
||||||
if !hashesInitialized {
|
if !hashesInitialized {
|
||||||
hashes = txi.matchRange(r, startKey(r.key))
|
filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, true)
|
||||||
hashesInitialized = true
|
hashesInitialized = true
|
||||||
|
|
||||||
|
// Ignore any remaining conditions if the first condition resulted
|
||||||
|
// in no matches (assuming implicit AND operand).
|
||||||
|
if len(filteredHashes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
hashes = intersect(hashes, txi.matchRange(r, startKey(r.key)))
|
filteredHashes = txi.matchRange(r, startKey(r.key), filteredHashes, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -211,21 +218,26 @@ func (txi *TxIndex) Search(q *query.Query) ([]*types.TxResult, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !hashesInitialized {
|
if !hashesInitialized {
|
||||||
hashes = txi.match(c, startKeyForCondition(c, height))
|
filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, true)
|
||||||
hashesInitialized = true
|
hashesInitialized = true
|
||||||
|
|
||||||
|
// Ignore any remaining conditions if the first condition resulted
|
||||||
|
// in no matches (assuming implicit AND operand).
|
||||||
|
if len(filteredHashes) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
hashes = intersect(hashes, txi.match(c, startKeyForCondition(c, height)))
|
filteredHashes = txi.match(c, startKeyForCondition(c, height), filteredHashes, false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
results := make([]*types.TxResult, len(hashes))
|
results := make([]*types.TxResult, 0, len(filteredHashes))
|
||||||
i := 0
|
for _, h := range filteredHashes {
|
||||||
for _, h := range hashes {
|
res, err := txi.Get(h)
|
||||||
results[i], err = txi.Get(h)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
|
return nil, errors.Wrapf(err, "failed to get Tx{%X}", h)
|
||||||
}
|
}
|
||||||
i++
|
results = append(results, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
// sort by height & index by default
|
// sort by height & index by default
|
||||||
@ -353,63 +365,115 @@ func isRangeOperation(op query.Operator) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (txi *TxIndex) match(c query.Condition, startKeyBz []byte) (hashes [][]byte) {
|
// match returns all matching txs by hash that meet a given condition and start
|
||||||
|
// key. An already filtered result (filteredHashes) is provided such that any
|
||||||
|
// non-intersecting matches are removed.
|
||||||
|
//
|
||||||
|
// NOTE: filteredHashes may be empty if no previous condition has matched.
|
||||||
|
func (txi *TxIndex) match(c query.Condition, startKeyBz []byte, filteredHashes map[string][]byte, firstRun bool) map[string][]byte {
|
||||||
|
// A previous match was attempted but resulted in no matches, so we return
|
||||||
|
// no matches (assuming AND operand).
|
||||||
|
if !firstRun && len(filteredHashes) == 0 {
|
||||||
|
return filteredHashes
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpHashes := make(map[string][]byte)
|
||||||
|
|
||||||
if c.Op == query.OpEqual {
|
if c.Op == query.OpEqual {
|
||||||
it := dbm.IteratePrefix(txi.store, startKeyBz)
|
it := dbm.IteratePrefix(txi.store, startKeyBz)
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
|
|
||||||
for ; it.Valid(); it.Next() {
|
for ; it.Valid(); it.Next() {
|
||||||
hashes = append(hashes, it.Value())
|
tmpHashes[string(it.Value())] = it.Value()
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if c.Op == query.OpContains {
|
} else if c.Op == query.OpContains {
|
||||||
// XXX: startKey does not apply here.
|
// XXX: startKey does not apply here.
|
||||||
// For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an"
|
// For example, if startKey = "account.owner/an/" and search query = "account.owner CONTAINS an"
|
||||||
// we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
|
// we can't iterate with prefix "account.owner/an/" because we might miss keys like "account.owner/Ulan/"
|
||||||
it := dbm.IteratePrefix(txi.store, startKey(c.Tag))
|
it := dbm.IteratePrefix(txi.store, startKey(c.Tag))
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
|
|
||||||
for ; it.Valid(); it.Next() {
|
for ; it.Valid(); it.Next() {
|
||||||
if !isTagKey(it.Key()) {
|
if !isTagKey(it.Key()) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
|
if strings.Contains(extractValueFromKey(it.Key()), c.Operand.(string)) {
|
||||||
hashes = append(hashes, it.Value())
|
tmpHashes[string(it.Value())] = it.Value()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
panic("other operators should be handled already")
|
panic("other operators should be handled already")
|
||||||
}
|
}
|
||||||
return
|
|
||||||
|
if len(tmpHashes) == 0 || firstRun {
|
||||||
|
// Either:
|
||||||
|
//
|
||||||
|
// 1. Regardless if a previous match was attempted, which may have had
|
||||||
|
// results, but no match was found for the current condition, then we
|
||||||
|
// return no matches (assuming AND operand).
|
||||||
|
//
|
||||||
|
// 2. A previous match was not attempted, so we return all results.
|
||||||
|
return tmpHashes
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove/reduce matches in filteredHashes that were not found in this
|
||||||
|
// match (tmpHashes).
|
||||||
|
for k := range filteredHashes {
|
||||||
|
if tmpHashes[k] == nil {
|
||||||
|
delete(filteredHashes, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filteredHashes
|
||||||
}
|
}
|
||||||
|
|
||||||
func (txi *TxIndex) matchRange(r queryRange, startKey []byte) (hashes [][]byte) {
|
// matchRange returns all matching txs by hash that meet a given queryRange and
|
||||||
// create a map to prevent duplicates
|
// start key. An already filtered result (filteredHashes) is provided such that
|
||||||
hashesMap := make(map[string][]byte)
|
// any non-intersecting matches are removed.
|
||||||
|
//
|
||||||
|
// NOTE: filteredHashes may be empty if no previous condition has matched.
|
||||||
|
func (txi *TxIndex) matchRange(r queryRange, startKey []byte, filteredHashes map[string][]byte, firstRun bool) map[string][]byte {
|
||||||
|
// A previous match was attempted but resulted in no matches, so we return
|
||||||
|
// no matches (assuming AND operand).
|
||||||
|
if !firstRun && len(filteredHashes) == 0 {
|
||||||
|
return filteredHashes
|
||||||
|
}
|
||||||
|
|
||||||
|
tmpHashes := make(map[string][]byte)
|
||||||
lowerBound := r.lowerBoundValue()
|
lowerBound := r.lowerBoundValue()
|
||||||
upperBound := r.upperBoundValue()
|
upperBound := r.upperBoundValue()
|
||||||
|
|
||||||
it := dbm.IteratePrefix(txi.store, startKey)
|
it := dbm.IteratePrefix(txi.store, startKey)
|
||||||
defer it.Close()
|
defer it.Close()
|
||||||
|
|
||||||
LOOP:
|
LOOP:
|
||||||
for ; it.Valid(); it.Next() {
|
for ; it.Valid(); it.Next() {
|
||||||
if !isTagKey(it.Key()) {
|
if !isTagKey(it.Key()) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
switch r.AnyBound().(type) {
|
switch r.AnyBound().(type) {
|
||||||
case int64:
|
case int64:
|
||||||
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
v, err := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue LOOP
|
continue LOOP
|
||||||
}
|
}
|
||||||
|
|
||||||
include := true
|
include := true
|
||||||
if lowerBound != nil && v < lowerBound.(int64) {
|
if lowerBound != nil && v < lowerBound.(int64) {
|
||||||
include = false
|
include = false
|
||||||
}
|
}
|
||||||
|
|
||||||
if upperBound != nil && v > upperBound.(int64) {
|
if upperBound != nil && v > upperBound.(int64) {
|
||||||
include = false
|
include = false
|
||||||
}
|
}
|
||||||
|
|
||||||
if include {
|
if include {
|
||||||
hashesMap[fmt.Sprintf("%X", it.Value())] = it.Value()
|
tmpHashes[string(it.Value())] = it.Value()
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX: passing time in a ABCI Tags is not yet implemented
|
// XXX: passing time in a ABCI Tags is not yet implemented
|
||||||
// case time.Time:
|
// case time.Time:
|
||||||
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
// v := strconv.ParseInt(extractValueFromKey(it.Key()), 10, 64)
|
||||||
@ -418,13 +482,27 @@ LOOP:
|
|||||||
// }
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
hashes = make([][]byte, len(hashesMap))
|
|
||||||
i := 0
|
if len(tmpHashes) == 0 || firstRun {
|
||||||
for _, h := range hashesMap {
|
// Either:
|
||||||
hashes[i] = h
|
//
|
||||||
i++
|
// 1. Regardless if a previous match was attempted, which may have had
|
||||||
|
// results, but no match was found for the current condition, then we
|
||||||
|
// return no matches (assuming AND operand).
|
||||||
|
//
|
||||||
|
// 2. A previous match was not attempted, so we return all results.
|
||||||
|
return tmpHashes
|
||||||
}
|
}
|
||||||
return
|
|
||||||
|
// Remove/reduce matches in filteredHashes that were not found in this
|
||||||
|
// match (tmpHashes).
|
||||||
|
for k := range filteredHashes {
|
||||||
|
if tmpHashes[k] == nil {
|
||||||
|
delete(filteredHashes, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return filteredHashes
|
||||||
}
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
@ -471,18 +549,3 @@ func startKey(fields ...interface{}) []byte {
|
|||||||
}
|
}
|
||||||
return b.Bytes()
|
return b.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
|
||||||
// Utils
|
|
||||||
|
|
||||||
func intersect(as, bs [][]byte) [][]byte {
|
|
||||||
i := make([][]byte, 0, cmn.MinInt(len(as), len(bs)))
|
|
||||||
for _, a := range as {
|
|
||||||
for _, b := range bs {
|
|
||||||
if bytes.Equal(a, b) {
|
|
||||||
i = append(i, a)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
|
72
state/txindex/kv/kv_bench_test.go
Normal file
72
state/txindex/kv/kv_bench_test.go
Normal file
@ -0,0 +1,72 @@
|
|||||||
|
package kv
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/rand"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
abci "github.com/tendermint/tendermint/abci/types"
|
||||||
|
cmn "github.com/tendermint/tendermint/libs/common"
|
||||||
|
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||||
|
"github.com/tendermint/tendermint/types"
|
||||||
|
dbm "github.com/tendermint/tm-db"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkTxSearch(b *testing.B) {
|
||||||
|
dbDir, err := ioutil.TempDir("", "benchmark_tx_search_test")
|
||||||
|
if err != nil {
|
||||||
|
b.Errorf("failed to create temporary directory: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
db, err := dbm.NewGoLevelDB("benchmark_tx_search_test", dbDir)
|
||||||
|
if err != nil {
|
||||||
|
b.Errorf("failed to create database: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
allowedTags := []string{"transfer.address", "transfer.amount"}
|
||||||
|
indexer := NewTxIndex(db, IndexTags(allowedTags))
|
||||||
|
|
||||||
|
for i := 0; i < 35000; i++ {
|
||||||
|
events := []abci.Event{
|
||||||
|
{
|
||||||
|
Type: "transfer",
|
||||||
|
Attributes: []cmn.KVPair{
|
||||||
|
{Key: []byte("address"), Value: []byte(fmt.Sprintf("address_%d", i%100))},
|
||||||
|
{Key: []byte("amount"), Value: []byte("50")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
txBz := make([]byte, 8)
|
||||||
|
if _, err := rand.Read(txBz); err != nil {
|
||||||
|
b.Errorf("failed produce random bytes: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
txResult := &types.TxResult{
|
||||||
|
Height: int64(i),
|
||||||
|
Index: 0,
|
||||||
|
Tx: types.Tx(string(txBz)),
|
||||||
|
Result: abci.ResponseDeliverTx{
|
||||||
|
Data: []byte{0},
|
||||||
|
Code: abci.CodeTypeOK,
|
||||||
|
Log: "",
|
||||||
|
Events: events,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := indexer.Index(txResult); err != nil {
|
||||||
|
b.Errorf("failed to index tx: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
txQuery := query.MustParse("transfer.address = 'address_43' AND transfer.amount = 50")
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
if _, err := indexer.Search(txQuery); err != nil {
|
||||||
|
b.Errorf("failed to query for txs: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -8,10 +8,11 @@ import (
|
|||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
abci "github.com/tendermint/tendermint/abci/types"
|
|
||||||
cmn "github.com/tendermint/tendermint/libs/common"
|
|
||||||
db "github.com/tendermint/tm-db"
|
db "github.com/tendermint/tm-db"
|
||||||
|
|
||||||
|
abci "github.com/tendermint/tendermint/abci/types"
|
||||||
|
cmn "github.com/tendermint/tendermint/libs/common"
|
||||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||||
"github.com/tendermint/tendermint/state/txindex"
|
"github.com/tendermint/tendermint/state/txindex"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
@ -89,6 +90,11 @@ func TestTxSearch(t *testing.T) {
|
|||||||
{"account.number = 1 AND account.owner = 'Ivan'", 1},
|
{"account.number = 1 AND account.owner = 'Ivan'", 1},
|
||||||
// search by exact match (two tags)
|
// search by exact match (two tags)
|
||||||
{"account.number = 1 AND account.owner = 'Vlad'", 0},
|
{"account.number = 1 AND account.owner = 'Vlad'", 0},
|
||||||
|
{"account.owner = 'Vlad' AND account.number = 1", 0},
|
||||||
|
{"account.number >= 1 AND account.owner = 'Vlad'", 0},
|
||||||
|
{"account.owner = 'Vlad' AND account.number >= 1", 0},
|
||||||
|
{"account.number <= 0", 0},
|
||||||
|
{"account.number <= 0 AND account.owner = 'Ivan'", 0},
|
||||||
// search using a prefix of the stored value
|
// search using a prefix of the stored value
|
||||||
{"account.owner = 'Iv'", 0},
|
{"account.owner = 'Iv'", 0},
|
||||||
// search by range
|
// search by range
|
||||||
@ -310,7 +316,7 @@ func benchmarkTxIndex(txsCount int64, b *testing.B) {
|
|||||||
}
|
}
|
||||||
defer os.RemoveAll(dir) // nolint: errcheck
|
defer os.RemoveAll(dir) // nolint: errcheck
|
||||||
|
|
||||||
store := db.NewDB("tx_index", "leveldb", dir)
|
store := db.NewDB("tx_index", "goleveldb", dir)
|
||||||
indexer := NewTxIndex(store)
|
indexer := NewTxIndex(store)
|
||||||
|
|
||||||
batch := txindex.NewBatch(txsCount)
|
batch := txindex.NewBatch(txsCount)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user