Compare commits

...

8 Commits

Author SHA1 Message Date
Ismail Khoffi
ecf1d3980a Apply suggestions from code review
Co-Authored-By: melekes <anton.kalyaev@gmail.com>
2019-03-26 18:17:53 +01:00
Ismail Khoffi
ca70680728 Update rpc/core/blocks.go
Co-Authored-By: melekes <anton.kalyaev@gmail.com>
2019-03-26 18:12:59 +01:00
Anton Kaliaev
be1b759d2f Merge branch 'develop' into 2491-delete-old-results 2019-03-26 17:52:37 +01:00
Anton Kaliaev
9161d9fd14 Merge branch 'develop' into 2491-delete-old-results 2018-10-08 10:18:28 +04:00
Anton Kaliaev
45c6f2fec7 Merge branch 'develop' into 2491-delete-old-results 2018-10-03 11:36:48 +04:00
Anton Kaliaev
0ff6526975 const abciResponses key 2018-09-27 12:00:51 +04:00
Anton Kaliaev
5afe5c4af5 [rpc] use indexer while fetching block results 2018-09-27 11:53:26 +04:00
Anton Kaliaev
b1fbfebc79 [state] store ABCIResponses only for the last block
Refs https://github.com/tendermint/tendermint/issues/2491#issuecomment-424927721
2018-09-27 10:09:24 +04:00
11 changed files with 115 additions and 56 deletions

View File

@@ -5,6 +5,7 @@
### BREAKING CHANGES: ### BREAKING CHANGES:
* CLI/RPC/Config * CLI/RPC/Config
* [rpc] `/block_results` now uses indexer to fetch block results for heights < `latestBlockHeight`
* Apps * Apps
@@ -24,5 +25,5 @@
- [mempool] \#2778 No longer send txs back to peers who sent it to you - [mempool] \#2778 No longer send txs back to peers who sent it to you
### BUG FIXES: ### BUG FIXES:
- [state] \#2491 Store ABCIResponses only for the last block
- [blockchain] \#2699 update the maxHeight when a peer is removed - [blockchain] \#2699 update the maxHeight when a peer is removed

View File

@@ -381,7 +381,7 @@ func (h *Handshaker) ReplayBlocks(
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We ran Commit, but didn't save the state, so replayBlock with mock app // We ran Commit, but didn't save the state, so replayBlock with mock app
abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight) abciResponses, err := sm.LoadABCIResponses(h.stateDB)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -3,10 +3,15 @@ package core
import ( import (
"fmt" "fmt"
"github.com/pkg/errors"
abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types" rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@@ -338,6 +343,10 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro
// BlockResults gets ABCIResults at a given height. // BlockResults gets ABCIResults at a given height.
// If no height is provided, it will fetch results for the latest block. // If no height is provided, it will fetch results for the latest block.
// //
// Results for the latest height are fetched from the state database, for
// earlier heights - using the indexer. Note: indexer does not store EndBlock
// ("end_block": null).
//
// Results are for the height of the block containing the txs. // Results are for the height of the block containing the txs.
// Thus response.results[5] is the results of executing getBlock(h).Txs[5] // Thus response.results[5] is the results of executing getBlock(h).Txs[5]
// //
@@ -380,17 +389,63 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR
return nil, err return nil, err
} }
// load the results // for last height, use state DB
results, err := sm.LoadABCIResponses(stateDB, height) if height == storeHeight {
if err != nil { results, err := sm.LoadABCIResponses(stateDB)
return nil, err if err != nil {
return nil, err
}
return &ctypes.ResultBlockResults{
Height: height,
Results: results,
}, nil
} }
res := &ctypes.ResultBlockResults{ // for other heights, use indexer
Height: height, // if index is disabled, return error
Results: results, if _, ok := txIndexer.(*null.TxIndex); ok {
return nil, fmt.Errorf("transaction indexing is disabled")
} }
return res, nil
var deliverTxs []*abci.ResponseDeliverTx
// If Tendermint is configured to index txs by height, use it for faster search.
if txIndexer.IsTagIndexed(types.TxHeightKey) {
results, err := txIndexer.Search(
tmquery.MustParse(fmt.Sprintf("%s = %d", types.TxHeightKey, height)),
)
if err != nil {
return nil, errors.Wrap(err, "failed to search indexer for txs")
}
deliverTxs = make([]*abci.ResponseDeliverTx, len(results))
for i, r := range results {
deliverTxs[i] = &r.Result
}
} else { // otherwise, load the whole block and fetch txs one by one
block := blockStore.LoadBlock(height)
if block != nil {
deliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))
for i, tx := range block.Data.Txs {
r, err := txIndexer.Get(tx.Hash())
if err != nil {
return nil, errors.Wrapf(err, "failed to get Tx (%X) from indexer", tx.Hash())
}
if r == nil {
return nil, fmt.Errorf("Tx (%X) not found", tx.Hash())
}
deliverTxs[i] = &r.Result
}
}
}
return &ctypes.ResultBlockResults{
Height: height,
Results: &sm.ABCIResponses{
DeliverTx: deliverTxs,
EndBlock: nil,
},
}, nil
} }
func getHeight(currentHeight int64, heightPtr *int64) (int64, error) { func getHeight(currentHeight int64, heightPtr *int64) (int64, error) {

View File

@@ -1,6 +1,9 @@
package state package state
import "fmt" import (
"errors"
"fmt"
)
type ( type (
ErrInvalidBlock error ErrInvalidBlock error
@@ -39,10 +42,10 @@ type (
ErrNoConsensusParamsForHeight struct { ErrNoConsensusParamsForHeight struct {
Height int64 Height int64
} }
)
ErrNoABCIResponsesForHeight struct { var (
Height int64 ErrNoABCIResponses = errors.New("could not find results")
}
) )
func (e ErrUnknownBlock) Error() string { func (e ErrUnknownBlock) Error() string {
@@ -71,7 +74,3 @@ func (e ErrNoValSetForHeight) Error() string {
func (e ErrNoConsensusParamsForHeight) Error() string { func (e ErrNoConsensusParamsForHeight) Error() string {
return fmt.Sprintf("Could not find consensus params for height #%d", e.Height) return fmt.Sprintf("Could not find consensus params for height #%d", e.Height)
} }
func (e ErrNoABCIResponsesForHeight) Error() string {
return fmt.Sprintf("Could not find results for height #%d", e.Height)
}

View File

@@ -126,7 +126,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
fail.Fail() // XXX fail.Fail() // XXX
// Save the results before we commit. // Save the results before we commit.
saveABCIResponses(blockExec.db, block.Height, abciResponses) saveABCIResponses(blockExec.db,abciResponses)
fail.Fail() // XXX fail.Fail() // XXX

View File

@@ -32,7 +32,7 @@ func TestApplyBlock(t *testing.T) {
cc := proxy.NewLocalClientCreator(kvstore.NewKVStoreApplication()) cc := proxy.NewLocalClientCreator(kvstore.NewKVStoreApplication())
proxyApp := proxy.NewAppConns(cc) proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start() err := proxyApp.Start()
require.Nil(t, err) require.NoError(t, err)
defer proxyApp.Stop() defer proxyApp.Stop()
state, stateDB := state(1, 1) state, stateDB := state(1, 1)
@@ -45,7 +45,13 @@ func TestApplyBlock(t *testing.T) {
//nolint:ineffassign //nolint:ineffassign
state, err = blockExec.ApplyBlock(state, blockID, block) state, err = blockExec.ApplyBlock(state, blockID, block)
require.Nil(t, err) require.NoError(t, err)
// test we save results
abciResponses, err := LoadABCIResponses(stateDB)
if assert.NoError(t, err) {
assert.NotNil(t, abciResponses)
}
// TODO check state and mempool // TODO check state and mempool
} }

View File

@@ -12,12 +12,11 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto" "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519" "github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@@ -99,8 +98,8 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
types.TM2PB.NewValidatorUpdate(ed25519.GenPrivKey().PubKey(), 10), types.TM2PB.NewValidatorUpdate(ed25519.GenPrivKey().PubKey(), 10),
}} }}
saveABCIResponses(stateDB, block.Height, abciResponses) saveABCIResponses(stateDB, abciResponses)
loadedABCIResponses, err := LoadABCIResponses(stateDB, block.Height) loadedABCIResponses, err := LoadABCIResponses(stateDB)
assert.Nil(err) assert.Nil(err)
assert.Equal(abciResponses, loadedABCIResponses, assert.Equal(abciResponses, loadedABCIResponses,
fmt.Sprintf("ABCIResponses don't match:\ngot: %v\nexpected: %v\n", fmt.Sprintf("ABCIResponses don't match:\ngot: %v\nexpected: %v\n",
@@ -111,8 +110,6 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
func TestABCIResponsesSaveLoad2(t *testing.T) { func TestABCIResponsesSaveLoad2(t *testing.T) {
tearDown, stateDB, _ := setupTestCase(t) tearDown, stateDB, _ := setupTestCase(t)
defer tearDown(t) defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
cases := [...]struct { cases := [...]struct {
// Height is implied to equal index+2, // Height is implied to equal index+2,
@@ -150,29 +147,19 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
}, },
} }
// Query all before, this should return error. res, err := LoadABCIResponses(stateDB)
for i := range cases { assert.Error(t, err, "%#v", res)
h := int64(i + 1)
res, err := LoadABCIResponses(stateDB, h)
assert.Error(err, "%d: %#v", i, res)
}
// Add all cases.
for i, tc := range cases { for i, tc := range cases {
h := int64(i + 1) // last block height, one below what we save
responses := &ABCIResponses{ responses := &ABCIResponses{
DeliverTx: tc.added, DeliverTx: tc.added,
EndBlock: &abci.ResponseEndBlock{}, EndBlock: &abci.ResponseEndBlock{},
} }
saveABCIResponses(stateDB, h, responses) saveABCIResponses(stateDB, responses)
}
// Query all before, should return expected value. res, err = LoadABCIResponses(stateDB)
for i, tc := range cases { assert.NoError(t, err, "%d", i)
h := int64(i + 1) assert.Equal(t, tc.expected.Hash(), res.ResultsHash(), "%d", i)
res, err := LoadABCIResponses(stateDB, h)
assert.NoError(err, "%d", i)
assert.Equal(tc.expected.Hash(), res.ResultsHash(), "%d", i)
} }
} }

View File

@@ -9,6 +9,10 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
var (
abciResponsesKey = []byte("abciResponsesKey")
)
//------------------------------------------------------------------------ //------------------------------------------------------------------------
func calcValidatorsKey(height int64) []byte { func calcValidatorsKey(height int64) []byte {
@@ -19,10 +23,6 @@ func calcConsensusParamsKey(height int64) []byte {
return []byte(fmt.Sprintf("consensusParamsKey:%v", height)) return []byte(fmt.Sprintf("consensusParamsKey:%v", height))
} }
func calcABCIResponsesKey(height int64) []byte {
return []byte(fmt.Sprintf("abciResponsesKey:%v", height))
}
// LoadStateFromDBOrGenesisFile loads the most recent state from the database, // LoadStateFromDBOrGenesisFile loads the most recent state from the database,
// or creates a new one from the given genesisFilePath and persists the result // or creates a new one from the given genesisFilePath and persists the result
// to the database. // to the database.
@@ -137,10 +137,10 @@ func (arz *ABCIResponses) ResultsHash() []byte {
// LoadABCIResponses loads the ABCIResponses for the given height from the database. // LoadABCIResponses loads the ABCIResponses for the given height from the database.
// This is useful for recovering from crashes where we called app.Commit and before we called // This is useful for recovering from crashes where we called app.Commit and before we called
// s.Save(). It can also be used to produce Merkle proofs of the result of txs. // s.Save(). It can also be used to produce Merkle proofs of the result of txs.
func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) { func LoadABCIResponses(db dbm.DB) (*ABCIResponses, error) {
buf := db.Get(calcABCIResponsesKey(height)) buf := db.Get(abciResponsesKey)
if len(buf) == 0 { if len(buf) == 0 {
return nil, ErrNoABCIResponsesForHeight{height} return nil, ErrNoABCIResponses
} }
abciResponses := new(ABCIResponses) abciResponses := new(ABCIResponses)
@@ -155,11 +155,10 @@ func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
return abciResponses, nil return abciResponses, nil
} }
// SaveABCIResponses persists the ABCIResponses to the database. // saveABCIResponses persists the ABCIResponses to the database.
// This is useful in case we crash after app.Commit and before s.Save(). // This is useful in case we crash after app.Commit and before s.Save().
// Responses are indexed by height so they can also be loaded later to produce Merkle proofs. func saveABCIResponses(db dbm.DB, abciResponses *ABCIResponses) {
func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) { db.SetSync(abciResponsesKey, abciResponses.Bytes())
db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes())
} }
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------

View File

@@ -9,7 +9,6 @@ import (
// TxIndexer interface defines methods to index and search transactions. // TxIndexer interface defines methods to index and search transactions.
type TxIndexer interface { type TxIndexer interface {
// AddBatch analyzes, indexes and stores a batch of transactions. // AddBatch analyzes, indexes and stores a batch of transactions.
AddBatch(b *Batch) error AddBatch(b *Batch) error
@@ -22,6 +21,9 @@ type TxIndexer interface {
// Search allows you to query for transactions. // Search allows you to query for transactions.
Search(q *query.Query) ([]*types.TxResult, error) Search(q *query.Query) ([]*types.TxResult, error)
// IsTagIndexed returns true if the tag is being indexed, false - otherwise.
IsTagIndexed(tag string) bool
} }
//---------------------------------------------------- //----------------------------------------------------

View File

@@ -10,9 +10,9 @@ import (
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db" dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/pubsub/query" "github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@@ -54,6 +54,11 @@ func IndexAllTags() func(*TxIndex) {
} }
} }
// IsTagIndexed returns true if the tag is being indexed, false - otherwise.
func (txi *TxIndex) IsTagIndexed(tag string) bool {
return txi.indexAllTags || cmn.StringInSlice(tag, txi.tagsToIndex)
}
// Get gets transaction from the TxIndex storage and returns it or nil if the // Get gets transaction from the TxIndex storage and returns it or nil if the
// transaction is not found. // transaction is not found.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {

View File

@@ -13,6 +13,11 @@ var _ txindex.TxIndexer = (*TxIndex)(nil)
// TxIndex acts as a /dev/null. // TxIndex acts as a /dev/null.
type TxIndex struct{} type TxIndex struct{}
// IsTagIndexed always returns false.
func (txi *TxIndex) IsTagIndexed(tag string) bool {
return false
}
// Get on a TxIndex is disabled and panics when invoked. // Get on a TxIndex is disabled and panics when invoked.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) { func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`) return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`)