diff --git a/blockchain/pool_test.go b/blockchain/pool_test.go index 8e6dbee9..220bc5ce 100644 --- a/blockchain/pool_test.go +++ b/blockchain/pool_test.go @@ -35,6 +35,7 @@ func TestBasic(t *testing.T) { requestsCh := make(chan BlockRequest, 100) pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() + defer pool.Stop() // Introduce each peer. go func() { @@ -76,8 +77,6 @@ func TestBasic(t *testing.T) { }() } } - - pool.Stop() } func TestTimeout(t *testing.T) { @@ -87,6 +86,7 @@ func TestTimeout(t *testing.T) { requestsCh := make(chan BlockRequest, 100) pool := NewBlockPool(start, requestsCh, timeoutsCh) pool.Start() + defer pool.Stop() for _, peer := range peers { log.Info("Peer", "id", peer.id) @@ -131,6 +131,4 @@ func TestTimeout(t *testing.T) { log.Info("TEST: Pulled new BlockRequest", "request", request) } } - - pool.Stop() } diff --git a/consensus/state.go b/consensus/state.go index 63616cbd..d6943581 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -283,7 +283,7 @@ func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.Ap //---------------------------------------- // Public interface -// implements events.Eventable +// SetEventSwitch implements events.Eventable func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) { cs.evsw = evsw } diff --git a/glide.lock b/glide.lock index e7988802..71b549db 100644 --- a/glide.lock +++ b/glide.lock @@ -164,4 +164,12 @@ imports: - stats - tap - transport -testImports: [] +testImports: +- name: github.com/davecgh/go-spew + version: 6d212800a42e8ab5c146b8ace3490ee17e5225f9 + subpackages: + - spew +- name: github.com/pmezard/go-difflib + version: d8ed2627bdf02c080bf22230dbb337003b7aba2d + subpackages: + - difflib diff --git a/node/node.go b/node/node.go index 6815c779..7a88f2d0 100644 --- a/node/node.go +++ b/node/node.go @@ -82,7 +82,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato } // reload the state (it may have been updated by the handshake) - state = sm.LoadState(stateDB) + state = sm.GetState(config, stateDB) // Generate node PrivKey privKey := crypto.GenPrivKeyEd25519() diff --git a/state/execution.go b/state/execution.go index aa911301..0a82c6da 100644 --- a/state/execution.go +++ b/state/execution.go @@ -2,26 +2,26 @@ package state import ( "errors" + "fmt" - "github.com/ebuchman/fail-test" - + fail "github.com/ebuchman/fail-test" abci "github.com/tendermint/abci/types" . "github.com/tendermint/go-common" - "github.com/tendermint/go-crypto" + crypto "github.com/tendermint/go-crypto" "github.com/tendermint/tendermint/proxy" + txindexer "github.com/tendermint/tendermint/state/tx/indexer" "github.com/tendermint/tendermint/types" ) -//-------------------------------------------------- -// Execute the block - -// Execute the block to mutate State. -// Validates block and then executes Data.Txs in the block. -func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error { - +// ExecBlock executes the block to mutate State. +// + validates the block +// + executes block.Txs on the proxyAppConn +// + updates validator sets +// + returns block.Txs results +func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) ([]*types.TxResult, error) { // Validate the block. if err := s.validateBlock(block); err != nil { - return ErrInvalidBlock(err) + return nil, ErrInvalidBlock(err) } // compute bitarray of validators that signed @@ -33,11 +33,11 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC nextValSet := valSet.Copy() // Execute the block txs - changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) + txResults, changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. - return ErrProxyAppConn(err) + return nil, ErrProxyAppConn(err) } // update the validator set @@ -54,16 +54,22 @@ func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnC fail.Fail() // XXX - return nil + return txResults, nil } // Executes block's transactions on proxyAppConn. -// Returns a list of updates to the validator set +// Returns a list of transaction results and updates to the validator set // TODO: Generate a bitmap or otherwise store tx validity in state. -func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*abci.Validator, error) { - +func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*types.TxResult, []*abci.Validator, error) { var validTxs, invalidTxs = 0, 0 + txResults := make([]*types.TxResult, len(block.Txs)) + + txHashToIndexMap := make(map[string]int) + for index, tx := range block.Txs { + txHashToIndexMap[string(tx.Hash())] = index + } + // Execute transactions and get hash proxyCb := func(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { @@ -73,21 +79,28 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo // Blocks may include invalid txs. // reqDeliverTx := req.(abci.RequestDeliverTx) txError := "" - apTx := r.DeliverTx - if apTx.Code == abci.CodeType_OK { - validTxs += 1 + txResult := r.DeliverTx + if txResult.Code == abci.CodeType_OK { + validTxs++ } else { - log.Debug("Invalid tx", "code", r.DeliverTx.Code, "log", r.DeliverTx.Log) - invalidTxs += 1 - txError = apTx.Code.String() + log.Debug("Invalid tx", "code", txResult.Code, "log", txResult.Log) + invalidTxs++ + txError = txResult.Code.String() } + + tx := types.Tx(req.GetDeliverTx().Tx) + index, ok := txHashToIndexMap[string(tx.Hash())] + if ok { + txResults[index] = &types.TxResult{uint64(block.Height), uint32(index), *txResult} + } + // NOTE: if we count we can access the tx from the block instead of // pulling it from the req event := types.EventDataTx{ - Tx: req.GetDeliverTx().Tx, - Data: apTx.Data, - Code: apTx.Code, - Log: apTx.Log, + Tx: tx, + Data: txResult.Data, + Code: txResult.Code, + Log: txResult.Log, Error: txError, } types.FireEventTx(eventCache, event) @@ -99,7 +112,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header)) if err != nil { log.Warn("Error in proxyAppConn.BeginBlock", "error", err) - return nil, err + return nil, nil, err } fail.Fail() // XXX @@ -109,7 +122,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo fail.FailRand(len(block.Txs)) // XXX proxyAppConn.DeliverTxAsync(tx) if err := proxyAppConn.Error(); err != nil { - return nil, err + return nil, nil, err } } @@ -119,7 +132,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { log.Warn("Error in proxyAppConn.EndBlock", "error", err) - return nil, err + return nil, nil, err } fail.Fail() // XXX @@ -128,7 +141,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo if len(respEndBlock.Diffs) > 0 { log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs)) } - return respEndBlock.Diffs, nil + return txResults, respEndBlock.Diffs, nil } func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error { @@ -218,26 +231,31 @@ func (s *State) validateBlock(block *types.Block) error { return nil } -//----------------------------------------------------------------------------- -// ApplyBlock executes the block, then commits and updates the mempool atomically - -// Execute and commit block against app, save block and state +// ApplyBlock executes the block, then commits and updates the mempool +// atomically, optionally indexing transaction results. func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error { - // Run the block on the State: - // + update validator sets - // + run txs on the proxyAppConn - err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + txResults, err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) if err != nil { - return errors.New(Fmt("Exec failed for application: %v", err)) + return fmt.Errorf("Exec failed for application: %v", err) } // lock mempool, commit state, update mempoool err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) if err != nil { - return errors.New(Fmt("Commit failed for application: %v", err)) + return fmt.Errorf("Commit failed for application: %v", err) } + + batch := txindexer.NewBatch() + for i, r := range txResults { + if r != nil { + tx := block.Txs[i] + batch.Index(string(tx.Hash()), *r) + } + } + s.TxIndexer.Batch(batch) + return nil } @@ -272,7 +290,7 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl // Returns the application root hash (result of abci.Commit) func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { var eventCache types.Fireable // nil - _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) + _, _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) if err != nil { log.Warn("Error executing block on proxy app", "height", block.Height, "err", err) return nil, err diff --git a/state/state.go b/state/state.go index c4c6d748..892c3874 100644 --- a/state/state.go +++ b/state/state.go @@ -10,6 +10,8 @@ import ( cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/state/tx" + txindexer "github.com/tendermint/tendermint/state/tx/indexer" "github.com/tendermint/tendermint/types" ) @@ -38,6 +40,8 @@ type State struct { // AppHash is updated after Commit AppHash []byte + + TxIndexer tx.Indexer `json:"-"` // Transaction indexer. } func LoadState(db dbm.DB) *State { @@ -72,6 +76,7 @@ func (s *State) Copy() *State { Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), AppHash: s.AppHash, + TxIndexer: s.TxIndexer, // pointer here, not value } } @@ -125,12 +130,20 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State { state = MakeGenesisStateFromFile(stateDB, config.GetString("genesis_file")) state.Save() } + + // Transaction indexing + store := dbm.NewDB("tx_indexer", config.GetString("db_backend"), config.GetString("db_dir")) + state.TxIndexer = txindexer.NewKV(store) + return state } //----------------------------------------------------------------------------- // Genesis +// MakeGenesisStateFromFile reads and unmarshals state from the given file. +// +// Used during replay and in tests. func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) *State { genDocJSON, err := ioutil.ReadFile(genDocFile) if err != nil { @@ -143,6 +156,9 @@ func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) *State { return MakeGenesisState(db, genDoc) } +// MakeGenesisState creates state from types.GenesisDoc. +// +// Used in tests. func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State { if len(genDoc.Validators) == 0 { Exit(Fmt("The genesis file has no validators")) @@ -176,5 +192,6 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) *State { Validators: types.NewValidatorSet(validators), LastValidators: types.NewValidatorSet(nil), AppHash: genDoc.AppHash, + TxIndexer: &txindexer.Null{}, // we do not need indexer during replay and in tests } } diff --git a/state/tx/indexer.go b/state/tx/indexer.go new file mode 100644 index 00000000..759f141a --- /dev/null +++ b/state/tx/indexer.go @@ -0,0 +1,21 @@ +package tx + +import ( + txindexer "github.com/tendermint/tendermint/state/tx/indexer" + "github.com/tendermint/tendermint/types" +) + +// Indexer interface defines methods to index and search transactions. +type Indexer interface { + + // Batch analyzes, indexes or stores a batch of transactions. + // + // NOTE We do not specify Index method for analyzing a single transaction + // here because it bears heavy perfomance loses. Almost all advanced indexers + // support batching. + Batch(b *txindexer.Batch) error + + // Tx returns specified transaction or nil if the transaction is not indexed + // or stored. + Tx(hash string) (*types.TxResult, error) +} diff --git a/state/tx/indexer/batch.go b/state/tx/indexer/batch.go new file mode 100644 index 00000000..3c64c886 --- /dev/null +++ b/state/tx/indexer/batch.go @@ -0,0 +1,32 @@ +package indexer + +import "github.com/tendermint/tendermint/types" + +// A Batch groups together multiple Index operations you would like performed +// at the same time. The Batch structure is NOT thread-safe. You should only +// perform operations on a batch from a single thread at a time. Once batch +// execution has started, you may not modify it. +type Batch struct { + Ops map[string]types.TxResult +} + +// NewBatch creates a new Batch. +func NewBatch() *Batch { + return &Batch{ + Ops: make(map[string]types.TxResult), + } +} + +// Index adds or updates entry for the given hash. +func (b *Batch) Index(hash string, result types.TxResult) error { + if hash == "" { + return ErrorEmptyHash + } + b.Ops[hash] = result + return nil +} + +// Size returns the total number of operations inside the batch. +func (b *Batch) Size() int { + return len(b.Ops) +} diff --git a/state/tx/indexer/error.go b/state/tx/indexer/error.go new file mode 100644 index 00000000..9d20593d --- /dev/null +++ b/state/tx/indexer/error.go @@ -0,0 +1,6 @@ +package indexer + +import "errors" + +// ErrorEmptyHash indicates empty hash +var ErrorEmptyHash = errors.New("Transaction hash cannot be empty") diff --git a/state/tx/indexer/kv.go b/state/tx/indexer/kv.go new file mode 100644 index 00000000..0c86d0a3 --- /dev/null +++ b/state/tx/indexer/kv.go @@ -0,0 +1,55 @@ +package indexer + +import ( + "bytes" + "fmt" + + db "github.com/tendermint/go-db" + "github.com/tendermint/go-wire" + "github.com/tendermint/tendermint/types" +) + +// KV is a simplest possible indexer, backed by Key-Value storage (levelDB). +// It could only index transaction by its identifier. +type KV struct { + store db.DB +} + +// NewKV returns new instance of KV indexer. +func NewKV(store db.DB) *KV { + return &KV{store: store} +} + +// Tx gets transaction from the KV storage and returns it or nil if the +// transaction is not found. +func (indexer *KV) Tx(hash string) (*types.TxResult, error) { + if hash == "" { + return nil, ErrorEmptyHash + } + + rawBytes := indexer.store.Get([]byte(hash)) + if rawBytes == nil { + return nil, nil + } + + r := bytes.NewReader(rawBytes) + var n int + var err error + txResult := wire.ReadBinary(&types.TxResult{}, r, 0, &n, &err).(*types.TxResult) + if err != nil { + return nil, fmt.Errorf("Error reading TxResult: %v", err) + } + + return txResult, nil +} + +// Batch writes a batch of transactions into the KV storage. +func (indexer *KV) Batch(b *Batch) error { + storeBatch := indexer.store.NewBatch() + for hash, result := range b.Ops { + rawBytes := wire.BinaryBytes(&result) + storeBatch.Set([]byte(hash), rawBytes) + } + storeBatch.Write() + return nil +} diff --git a/state/tx/indexer/kv_test.go b/state/tx/indexer/kv_test.go new file mode 100644 index 00000000..0e3b0a7d --- /dev/null +++ b/state/tx/indexer/kv_test.go @@ -0,0 +1,61 @@ +package indexer + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + abci "github.com/tendermint/abci/types" + db "github.com/tendermint/go-db" + "github.com/tendermint/tendermint/types" +) + +func TestKVIndex(t *testing.T) { + indexer := &KV{store: db.NewMemDB()} + + tx := types.Tx("HELLO WORLD") + txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} + hash := string(tx.Hash()) + + batch := NewBatch() + batch.Index(hash, *txResult) + err := indexer.Batch(batch) + require.Nil(t, err) + + loadedTxResult, err := indexer.Tx(hash) + require.Nil(t, err) + assert.Equal(t, txResult, loadedTxResult) +} + +func benchmarkKVIndex(txsCount int, b *testing.B) { + txResult := &types.TxResult{1, 1, abci.ResponseDeliverTx{Data: []byte{0}, Code: abci.CodeType_OK, Log: ""}} + + dir, err := ioutil.TempDir("", "tx_indexer_db") + if err != nil { + b.Fatal(err) + } + defer os.RemoveAll(dir) + + store := db.NewDB("tx_indexer", "leveldb", dir) + indexer := &KV{store: store} + + batch := NewBatch() + for i := 0; i < txsCount; i++ { + batch.Index(fmt.Sprintf("hash%v", i), *txResult) + } + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + err = indexer.Batch(batch) + } +} + +func BenchmarkKVIndex1(b *testing.B) { benchmarkKVIndex(1, b) } +func BenchmarkKVIndex500(b *testing.B) { benchmarkKVIndex(500, b) } +func BenchmarkKVIndex1000(b *testing.B) { benchmarkKVIndex(1000, b) } +func BenchmarkKVIndex2000(b *testing.B) { benchmarkKVIndex(2000, b) } +func BenchmarkKVIndex10000(b *testing.B) { benchmarkKVIndex(10000, b) } diff --git a/state/tx/indexer/null.go b/state/tx/indexer/null.go new file mode 100644 index 00000000..b196a705 --- /dev/null +++ b/state/tx/indexer/null.go @@ -0,0 +1,16 @@ +package indexer + +import "github.com/tendermint/tendermint/types" + +// Null acts as a /dev/null. +type Null struct{} + +// Tx panics. +func (indexer *Null) Tx(hash string) (*types.TxResult, error) { + panic("You are trying to get the transaction from a null indexer") +} + +// Batch returns nil. +func (indexer *Null) Batch(batch *Batch) error { + return nil +} diff --git a/types/tx_result.go b/types/tx_result.go new file mode 100644 index 00000000..c3ec0030 --- /dev/null +++ b/types/tx_result.go @@ -0,0 +1,14 @@ +package types + +import ( + abci "github.com/tendermint/abci/types" +) + +// TxResult contains results of executing the transaction. +// +// One usage is indexing transaction results. +type TxResult struct { + Height uint64 + Index uint32 + DeliverTxResponse abci.ResponseDeliverTx +}