state/txindex and pkg per indexer impl

This commit is contained in:
Ethan Buchman
2017-04-18 19:29:02 -04:00
parent 16bffdf7ab
commit d572bb0c5d
11 changed files with 122 additions and 117 deletions

View File

@ -9,7 +9,7 @@ import (
. "github.com/tendermint/go-common" . "github.com/tendermint/go-common"
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
txindexer "github.com/tendermint/tendermint/state/tx/indexer" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -241,12 +241,12 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn
return fmt.Errorf("Commit failed for application: %v", err) return fmt.Errorf("Commit failed for application: %v", err)
} }
batch := txindexer.NewBatch() batch := txindex.NewBatch()
for i, r := range txResults { for i, r := range txResults {
tx := block.Txs[i] tx := block.Txs[i]
batch.Index(tx.Hash(), *r) batch.Index(tx.Hash(), *r)
} }
s.TxIndexer.Batch(batch) s.TxIndexer.AddBatch(batch)
return nil return nil
} }

View File

@ -11,7 +11,7 @@ import (
cfg "github.com/tendermint/tendermint/config/tendermint_test" cfg "github.com/tendermint/tendermint/config/tendermint_test"
"github.com/tendermint/tendermint/mempool" "github.com/tendermint/tendermint/mempool"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
txindexer "github.com/tendermint/tendermint/state/tx/indexer" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -81,10 +81,10 @@ type dummyIndexer struct {
Indexed int Indexed int
} }
func (indexer *dummyIndexer) Tx(hash []byte) (*types.TxResult, error) { func (indexer *dummyIndexer) Get(hash []byte) (*types.TxResult, error) {
return nil, nil return nil, nil
} }
func (indexer *dummyIndexer) Batch(batch *txindexer.Batch) error { func (indexer *dummyIndexer) AddBatch(batch *txindex.Batch) error {
indexer.Indexed += batch.Size() indexer.Indexed += batch.Size()
return nil return nil
} }

View File

@ -10,8 +10,8 @@ import (
cfg "github.com/tendermint/go-config" cfg "github.com/tendermint/go-config"
dbm "github.com/tendermint/go-db" dbm "github.com/tendermint/go-db"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/tx" "github.com/tendermint/tendermint/state/txindex"
txindexer "github.com/tendermint/tendermint/state/tx/indexer" "github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -41,7 +41,7 @@ type State struct {
// AppHash is updated after Commit // AppHash is updated after Commit
AppHash []byte AppHash []byte
TxIndexer tx.Indexer `json:"-"` // Transaction indexer. TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer.
} }
func LoadState(db dbm.DB) *State { func LoadState(db dbm.DB) *State {
@ -49,7 +49,7 @@ func LoadState(db dbm.DB) *State {
} }
func loadState(db dbm.DB, key []byte) *State { func loadState(db dbm.DB, key []byte) *State {
s := &State{db: db, TxIndexer: &txindexer.Null{}} s := &State{db: db, TxIndexer: &null.TxIndex{}}
buf := db.Get(key) buf := db.Get(key)
if len(buf) == 0 { if len(buf) == 0 {
return nil return nil
@ -188,6 +188,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State {
Validators: types.NewValidatorSet(validators), Validators: types.NewValidatorSet(validators),
LastValidators: types.NewValidatorSet(nil), LastValidators: types.NewValidatorSet(nil),
AppHash: genDoc.AppHash, AppHash: genDoc.AppHash,
TxIndexer: &txindexer.Null{}, // we do not need indexer during replay and in tests TxIndexer: &null.TxIndex{}, // we do not need indexer during replay and in tests
} }
} }

View File

@ -1,21 +0,0 @@
package tx
import (
txindexer "github.com/tendermint/tendermint/state/tx/indexer"
"github.com/tendermint/tendermint/types"
)
// Indexer interface defines methods to index and search transactions.
type Indexer interface {
// Batch analyzes, indexes or stores a batch of transactions.
//
// NOTE We do not specify Index method for analyzing a single transaction
// here because it bears heavy perfomance loses. Almost all advanced indexers
// support batching.
Batch(b *txindexer.Batch) error
// Tx returns specified transaction or nil if the transaction is not indexed
// or stored.
Tx(hash []byte) (*types.TxResult, error)
}

View File

@ -1,32 +0,0 @@
package indexer
import "github.com/tendermint/tendermint/types"
// A Batch groups together multiple Index operations you would like performed
// at the same time. The Batch structure is NOT thread-safe. You should only
// perform operations on a batch from a single thread at a time. Once batch
// execution has started, you may not modify it.
type Batch struct {
Ops map[string]types.TxResult
}
// NewBatch creates a new Batch.
func NewBatch() *Batch {
return &Batch{
Ops: make(map[string]types.TxResult),
}
}
// Index adds or updates entry for the given hash.
func (b *Batch) Index(hash []byte, result types.TxResult) error {
if len(hash) == 0 {
return ErrorEmptyHash
}
b.Ops[string(hash)] = result
return nil
}
// Size returns the total number of operations inside the batch.
func (b *Batch) Size() int {
return len(b.Ops)
}

View File

@ -1,6 +0,0 @@
package indexer
import "errors"
// ErrorEmptyHash indicates empty hash
var ErrorEmptyHash = errors.New("Transaction hash cannot be empty")

View File

@ -1,19 +0,0 @@
package indexer
import (
"errors"
"github.com/tendermint/tendermint/types"
)
// Null acts as a /dev/null.
type Null struct{}
// Tx panics.
func (indexer *Null) Tx(hash []byte) (*types.TxResult, error) {
return nil, errors.New(`Indexing is disabled (set 'tx_indexer = "kv"' in config)`)
}
// Batch returns nil.
func (indexer *Null) Batch(batch *Batch) error {
return nil
}

60
state/txindex/indexer.go Normal file
View File

@ -0,0 +1,60 @@
package txindex
import (
"errors"
"github.com/tendermint/tendermint/types"
)
// Indexer interface defines methods to index and search transactions.
type TxIndexer interface {
// Batch analyzes, indexes or stores a batch of transactions.
//
// NOTE We do not specify Index method for analyzing a single transaction
// here because it bears heavy perfomance loses. Almost all advanced indexers
// support batching.
AddBatch(b *Batch) error
// Tx returns specified transaction or nil if the transaction is not indexed
// or stored.
Get(hash []byte) (*types.TxResult, error)
}
//----------------------------------------------------
// Txs are written as a batch
// A Batch groups together multiple Index operations you would like performed
// at the same time. The Batch structure is NOT thread-safe. You should only
// perform operations on a batch from a single thread at a time. Once batch
// execution has started, you may not modify it.
type Batch struct {
Ops map[string]types.TxResult
}
// NewBatch creates a new Batch.
func NewBatch() *Batch {
return &Batch{
Ops: make(map[string]types.TxResult),
}
}
// Index adds or updates entry for the given hash.
func (b *Batch) Index(hash []byte, result types.TxResult) error {
if len(hash) == 0 {
return ErrorEmptyHash
}
b.Ops[string(hash)] = result
return nil
}
// Size returns the total number of operations inside the batch.
func (b *Batch) Size() int {
return len(b.Ops)
}
//----------------------------------------------------
// Errors
// ErrorEmptyHash indicates empty hash
var ErrorEmptyHash = errors.New("Transaction hash cannot be empty")

View File

@ -1,4 +1,4 @@
package indexer package kv
import ( import (
"bytes" "bytes"
@ -6,28 +6,29 @@ import (
db "github.com/tendermint/go-db" db "github.com/tendermint/go-db"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// KV is a simplest possible indexer, backed by Key-Value storage (levelDB). // TxIndex is the simplest possible indexer, backed by Key-Value storage (levelDB).
// It could only index transaction by its identifier. // It could only index transaction by its identifier.
type KV struct { type TxIndex struct {
store db.DB store db.DB
} }
// NewKV returns new instance of KV indexer. // NewTxIndex returns new instance of TxIndex.
func NewKV(store db.DB) *KV { func NewTxIndex(store db.DB) *TxIndex {
return &KV{store: store} return &TxIndex{store: store}
} }
// Tx gets transaction from the KV storage and returns it or nil if the // Get gets transaction from the TxIndex storage and returns it or nil if the
// transaction is not found. // transaction is not found.
func (indexer *KV) Tx(hash []byte) (*types.TxResult, error) { func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
if len(hash) == 0 { if len(hash) == 0 {
return nil, ErrorEmptyHash return nil, txindex.ErrorEmptyHash
} }
rawBytes := indexer.store.Get(hash) rawBytes := txi.store.Get(hash)
if rawBytes == nil { if rawBytes == nil {
return nil, nil return nil, nil
} }
@ -43,9 +44,9 @@ func (indexer *KV) Tx(hash []byte) (*types.TxResult, error) {
return txResult, nil return txResult, nil
} }
// Batch writes a batch of transactions into the KV storage. // Batch writes a batch of transactions into the TxIndex storage.
func (indexer *KV) Batch(b *Batch) error { func (txi *TxIndex) AddBatch(b *txindex.Batch) error {
storeBatch := indexer.store.NewBatch() storeBatch := txi.store.NewBatch()
for hash, result := range b.Ops { for hash, result := range b.Ops {
rawBytes := wire.BinaryBytes(&result) rawBytes := wire.BinaryBytes(&result)
storeBatch.Set([]byte(hash), rawBytes) storeBatch.Set([]byte(hash), rawBytes)

View File

@ -1,4 +1,4 @@
package indexer package kv
import ( import (
"fmt" "fmt"
@ -10,27 +10,28 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
db "github.com/tendermint/go-db" db "github.com/tendermint/go-db"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func TestKVIndex(t *testing.T) { func TestTxIndex(t *testing.T) {
indexer := &KV{store: db.NewMemDB()} indexer := &TxIndex{store: db.NewMemDB()}
tx := types.Tx("HELLO WORLD") tx := types.Tx("HELLO WORLD")
txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}}
hash := tx.Hash() hash := tx.Hash()
batch := NewBatch() batch := txindex.NewBatch()
batch.Index(hash, *txResult) batch.Index(hash, *txResult)
err := indexer.Batch(batch) err := indexer.AddBatch(batch)
require.Nil(t, err) require.Nil(t, err)
loadedTxResult, err := indexer.Tx(hash) loadedTxResult, err := indexer.Get(hash)
require.Nil(t, err) require.Nil(t, err)
assert.Equal(t, txResult, loadedTxResult) assert.Equal(t, txResult, loadedTxResult)
} }
func benchmarkKVIndex(txsCount int, b *testing.B) { func benchmarkTxIndex(txsCount int, b *testing.B) {
txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}}
dir, err := ioutil.TempDir("", "tx_indexer_db") dir, err := ioutil.TempDir("", "tx_indexer_db")
@ -40,9 +41,9 @@ func benchmarkKVIndex(txsCount int, b *testing.B) {
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
store := db.NewDB("tx_indexer", "leveldb", dir) store := db.NewDB("tx_indexer", "leveldb", dir)
indexer := &KV{store: store} indexer := &TxIndex{store: store}
batch := NewBatch() batch := txindex.NewBatch()
for i := 0; i < txsCount; i++ { for i := 0; i < txsCount; i++ {
batch.Index([]byte(fmt.Sprintf("hash%v", i)), *txResult) batch.Index([]byte(fmt.Sprintf("hash%v", i)), *txResult)
} }
@ -50,12 +51,12 @@ func benchmarkKVIndex(txsCount int, b *testing.B) {
b.ResetTimer() b.ResetTimer()
for n := 0; n < b.N; n++ { for n := 0; n < b.N; n++ {
err = indexer.Batch(batch) err = indexer.AddBatch(batch)
} }
} }
func BenchmarkKVIndex1(b *testing.B) { benchmarkKVIndex(1, b) } func BenchmarkTxIndex1(b *testing.B) { benchmarkTxIndex(1, b) }
func BenchmarkKVIndex500(b *testing.B) { benchmarkKVIndex(500, b) } func BenchmarkTxIndex500(b *testing.B) { benchmarkTxIndex(500, b) }
func BenchmarkKVIndex1000(b *testing.B) { benchmarkKVIndex(1000, b) } func BenchmarkTxIndex1000(b *testing.B) { benchmarkTxIndex(1000, b) }
func BenchmarkKVIndex2000(b *testing.B) { benchmarkKVIndex(2000, b) } func BenchmarkTxIndex2000(b *testing.B) { benchmarkTxIndex(2000, b) }
func BenchmarkKVIndex10000(b *testing.B) { benchmarkKVIndex(10000, b) } func BenchmarkTxIndex10000(b *testing.B) { benchmarkTxIndex(10000, b) }

View File

@ -0,0 +1,21 @@
package null
import (
"errors"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
)
// TxIndex acts as a /dev/null.
type TxIndex struct{}
// Tx panics.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
return nil, errors.New(`Indexing is disabled (set 'tx_indexer = "kv"' in config)`)
}
// Batch returns nil.
func (txi *TxIndex) AddBatch(batch *txindex.Batch) error {
return nil
}