Compare commits

...

8 Commits

Author SHA1 Message Date
Ismail Khoffi
ecf1d3980a Apply suggestions from code review
Co-Authored-By: melekes <anton.kalyaev@gmail.com>
2019-03-26 18:17:53 +01:00
Ismail Khoffi
ca70680728 Update rpc/core/blocks.go
Co-Authored-By: melekes <anton.kalyaev@gmail.com>
2019-03-26 18:12:59 +01:00
Anton Kaliaev
be1b759d2f Merge branch 'develop' into 2491-delete-old-results 2019-03-26 17:52:37 +01:00
Anton Kaliaev
9161d9fd14 Merge branch 'develop' into 2491-delete-old-results 2018-10-08 10:18:28 +04:00
Anton Kaliaev
45c6f2fec7 Merge branch 'develop' into 2491-delete-old-results 2018-10-03 11:36:48 +04:00
Anton Kaliaev
0ff6526975 const abciResponses key 2018-09-27 12:00:51 +04:00
Anton Kaliaev
5afe5c4af5 [rpc] use indexer while fetching block results 2018-09-27 11:53:26 +04:00
Anton Kaliaev
b1fbfebc79 [state] store ABCIResponses only for the last block
Refs https://github.com/tendermint/tendermint/issues/2491#issuecomment-424927721
2018-09-27 10:09:24 +04:00
11 changed files with 115 additions and 56 deletions

View File

@@ -5,6 +5,7 @@
### BREAKING CHANGES:
* CLI/RPC/Config
* [rpc] `/block_results` now uses indexer to fetch block results for heights < `latestBlockHeight`
* Apps
@@ -24,5 +25,5 @@
- [mempool] \#2778 No longer send txs back to peers who sent it to you
### BUG FIXES:
- [state] \#2491 Store ABCIResponses only for the last block
- [blockchain] \#2699 update the maxHeight when a peer is removed

View File

@@ -381,7 +381,7 @@ func (h *Handshaker) ReplayBlocks(
} else if appBlockHeight == storeBlockHeight {
// We ran Commit, but didn't save the state, so replayBlock with mock app
abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight)
abciResponses, err := sm.LoadABCIResponses(h.stateDB)
if err != nil {
return nil, err
}

View File

@@ -3,10 +3,15 @@ package core
import (
"fmt"
"github.com/pkg/errors"
abci "github.com/tendermint/tendermint/abci/types"
cmn "github.com/tendermint/tendermint/libs/common"
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
ctypes "github.com/tendermint/tendermint/rpc/core/types"
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/txindex/null"
"github.com/tendermint/tendermint/types"
)
@@ -338,6 +343,10 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro
// BlockResults gets ABCIResults at a given height.
// If no height is provided, it will fetch results for the latest block.
//
// Results for the latest height are fetched from the state database, for
// earlier heights - using the indexer. Note: indexer does not store EndBlock
// ("end_block": null).
//
// Results are for the height of the block containing the txs.
// Thus response.results[5] is the results of executing getBlock(h).Txs[5]
//
@@ -380,17 +389,63 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR
return nil, err
}
// load the results
results, err := sm.LoadABCIResponses(stateDB, height)
if err != nil {
return nil, err
// for last height, use state DB
if height == storeHeight {
results, err := sm.LoadABCIResponses(stateDB)
if err != nil {
return nil, err
}
return &ctypes.ResultBlockResults{
Height: height,
Results: results,
}, nil
}
res := &ctypes.ResultBlockResults{
Height: height,
Results: results,
// for other heights, use indexer
// if index is disabled, return error
if _, ok := txIndexer.(*null.TxIndex); ok {
return nil, fmt.Errorf("transaction indexing is disabled")
}
return res, nil
var deliverTxs []*abci.ResponseDeliverTx
// If Tendermint is configured to index txs by height, use it for faster search.
if txIndexer.IsTagIndexed(types.TxHeightKey) {
results, err := txIndexer.Search(
tmquery.MustParse(fmt.Sprintf("%s = %d", types.TxHeightKey, height)),
)
if err != nil {
return nil, errors.Wrap(err, "failed to search indexer for txs")
}
deliverTxs = make([]*abci.ResponseDeliverTx, len(results))
for i, r := range results {
deliverTxs[i] = &r.Result
}
} else { // otherwise, load the whole block and fetch txs one by one
block := blockStore.LoadBlock(height)
if block != nil {
deliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))
for i, tx := range block.Data.Txs {
r, err := txIndexer.Get(tx.Hash())
if err != nil {
return nil, errors.Wrapf(err, "failed to get Tx (%X) from indexer", tx.Hash())
}
if r == nil {
return nil, fmt.Errorf("Tx (%X) not found", tx.Hash())
}
deliverTxs[i] = &r.Result
}
}
}
return &ctypes.ResultBlockResults{
Height: height,
Results: &sm.ABCIResponses{
DeliverTx: deliverTxs,
EndBlock: nil,
},
}, nil
}
func getHeight(currentHeight int64, heightPtr *int64) (int64, error) {

View File

@@ -1,6 +1,9 @@
package state
import "fmt"
import (
"errors"
"fmt"
)
type (
ErrInvalidBlock error
@@ -39,10 +42,10 @@ type (
ErrNoConsensusParamsForHeight struct {
Height int64
}
)
ErrNoABCIResponsesForHeight struct {
Height int64
}
var (
ErrNoABCIResponses = errors.New("could not find results")
)
func (e ErrUnknownBlock) Error() string {
@@ -71,7 +74,3 @@ func (e ErrNoValSetForHeight) Error() string {
func (e ErrNoConsensusParamsForHeight) Error() string {
return fmt.Sprintf("Could not find consensus params for height #%d", e.Height)
}
func (e ErrNoABCIResponsesForHeight) Error() string {
return fmt.Sprintf("Could not find results for height #%d", e.Height)
}

View File

@@ -126,7 +126,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
fail.Fail() // XXX
// Save the results before we commit.
saveABCIResponses(blockExec.db, block.Height, abciResponses)
saveABCIResponses(blockExec.db,abciResponses)
fail.Fail() // XXX

View File

@@ -32,7 +32,7 @@ func TestApplyBlock(t *testing.T) {
cc := proxy.NewLocalClientCreator(kvstore.NewKVStoreApplication())
proxyApp := proxy.NewAppConns(cc)
err := proxyApp.Start()
require.Nil(t, err)
require.NoError(t, err)
defer proxyApp.Stop()
state, stateDB := state(1, 1)
@@ -45,7 +45,13 @@ func TestApplyBlock(t *testing.T) {
//nolint:ineffassign
state, err = blockExec.ApplyBlock(state, blockID, block)
require.Nil(t, err)
require.NoError(t, err)
// test we save results
abciResponses, err := LoadABCIResponses(stateDB)
if assert.NoError(t, err) {
assert.NotNil(t, abciResponses)
}
// TODO check state and mempool
}

View File

@@ -12,12 +12,11 @@ import (
"github.com/stretchr/testify/require"
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/types"
)
@@ -99,8 +98,8 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
types.TM2PB.NewValidatorUpdate(ed25519.GenPrivKey().PubKey(), 10),
}}
saveABCIResponses(stateDB, block.Height, abciResponses)
loadedABCIResponses, err := LoadABCIResponses(stateDB, block.Height)
saveABCIResponses(stateDB, abciResponses)
loadedABCIResponses, err := LoadABCIResponses(stateDB)
assert.Nil(err)
assert.Equal(abciResponses, loadedABCIResponses,
fmt.Sprintf("ABCIResponses don't match:\ngot: %v\nexpected: %v\n",
@@ -111,8 +110,6 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
func TestABCIResponsesSaveLoad2(t *testing.T) {
tearDown, stateDB, _ := setupTestCase(t)
defer tearDown(t)
// nolint: vetshadow
assert := assert.New(t)
cases := [...]struct {
// Height is implied to equal index+2,
@@ -150,29 +147,19 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
},
}
// Query all before, this should return error.
for i := range cases {
h := int64(i + 1)
res, err := LoadABCIResponses(stateDB, h)
assert.Error(err, "%d: %#v", i, res)
}
res, err := LoadABCIResponses(stateDB)
assert.Error(t, err, "%#v", res)
// Add all cases.
for i, tc := range cases {
h := int64(i + 1) // last block height, one below what we save
responses := &ABCIResponses{
DeliverTx: tc.added,
EndBlock: &abci.ResponseEndBlock{},
}
saveABCIResponses(stateDB, h, responses)
}
saveABCIResponses(stateDB, responses)
// Query all before, should return expected value.
for i, tc := range cases {
h := int64(i + 1)
res, err := LoadABCIResponses(stateDB, h)
assert.NoError(err, "%d", i)
assert.Equal(tc.expected.Hash(), res.ResultsHash(), "%d", i)
res, err = LoadABCIResponses(stateDB)
assert.NoError(t, err, "%d", i)
assert.Equal(t, tc.expected.Hash(), res.ResultsHash(), "%d", i)
}
}

View File

@@ -9,6 +9,10 @@ import (
"github.com/tendermint/tendermint/types"
)
var (
abciResponsesKey = []byte("abciResponsesKey")
)
//------------------------------------------------------------------------
func calcValidatorsKey(height int64) []byte {
@@ -19,10 +23,6 @@ func calcConsensusParamsKey(height int64) []byte {
return []byte(fmt.Sprintf("consensusParamsKey:%v", height))
}
func calcABCIResponsesKey(height int64) []byte {
return []byte(fmt.Sprintf("abciResponsesKey:%v", height))
}
// LoadStateFromDBOrGenesisFile loads the most recent state from the database,
// or creates a new one from the given genesisFilePath and persists the result
// to the database.
@@ -137,10 +137,10 @@ func (arz *ABCIResponses) ResultsHash() []byte {
// LoadABCIResponses loads the ABCIResponses for the given height from the database.
// This is useful for recovering from crashes where we called app.Commit and before we called
// s.Save(). It can also be used to produce Merkle proofs of the result of txs.
func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
buf := db.Get(calcABCIResponsesKey(height))
func LoadABCIResponses(db dbm.DB) (*ABCIResponses, error) {
buf := db.Get(abciResponsesKey)
if len(buf) == 0 {
return nil, ErrNoABCIResponsesForHeight{height}
return nil, ErrNoABCIResponses
}
abciResponses := new(ABCIResponses)
@@ -155,11 +155,10 @@ func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
return abciResponses, nil
}
// SaveABCIResponses persists the ABCIResponses to the database.
// saveABCIResponses persists the ABCIResponses to the database.
// This is useful in case we crash after app.Commit and before s.Save().
// Responses are indexed by height so they can also be loaded later to produce Merkle proofs.
func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) {
db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes())
func saveABCIResponses(db dbm.DB, abciResponses *ABCIResponses) {
db.SetSync(abciResponsesKey, abciResponses.Bytes())
}
//-----------------------------------------------------------------------------

View File

@@ -9,7 +9,6 @@ import (
// TxIndexer interface defines methods to index and search transactions.
type TxIndexer interface {
// AddBatch analyzes, indexes and stores a batch of transactions.
AddBatch(b *Batch) error
@@ -22,6 +21,9 @@ type TxIndexer interface {
// Search allows you to query for transactions.
Search(q *query.Query) ([]*types.TxResult, error)
// IsTagIndexed returns true if the tag is being indexed, false - otherwise.
IsTagIndexed(tag string) bool
}
//----------------------------------------------------

View File

@@ -10,9 +10,9 @@ import (
"time"
"github.com/pkg/errors"
cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/pubsub/query"
"github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types"
@@ -54,6 +54,11 @@ func IndexAllTags() func(*TxIndex) {
}
}
// IsTagIndexed returns true if the tag is being indexed, false - otherwise.
func (txi *TxIndex) IsTagIndexed(tag string) bool {
return txi.indexAllTags || cmn.StringInSlice(tag, txi.tagsToIndex)
}
// Get gets transaction from the TxIndex storage and returns it or nil if the
// transaction is not found.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {

View File

@@ -13,6 +13,11 @@ var _ txindex.TxIndexer = (*TxIndex)(nil)
// TxIndex acts as a /dev/null.
type TxIndex struct{}
// IsTagIndexed always returns false.
func (txi *TxIndex) IsTagIndexed(tag string) bool {
return false
}
// Get on a TxIndex is disabled and panics when invoked.
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`)