mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-30 19:51:58 +00:00
Compare commits
8 Commits
v0.32.3
...
2491-delet
Author | SHA1 | Date | |
---|---|---|---|
|
ecf1d3980a | ||
|
ca70680728 | ||
|
be1b759d2f | ||
|
9161d9fd14 | ||
|
45c6f2fec7 | ||
|
0ff6526975 | ||
|
5afe5c4af5 | ||
|
b1fbfebc79 |
@@ -5,6 +5,7 @@
|
||||
### BREAKING CHANGES:
|
||||
|
||||
* CLI/RPC/Config
|
||||
* [rpc] `/block_results` now uses indexer to fetch block results for heights < `latestBlockHeight`
|
||||
|
||||
* Apps
|
||||
|
||||
@@ -24,5 +25,5 @@
|
||||
- [mempool] \#2778 No longer send txs back to peers who sent it to you
|
||||
|
||||
### BUG FIXES:
|
||||
|
||||
- [state] \#2491 Store ABCIResponses only for the last block
|
||||
- [blockchain] \#2699 update the maxHeight when a peer is removed
|
||||
|
@@ -381,7 +381,7 @@ func (h *Handshaker) ReplayBlocks(
|
||||
|
||||
} else if appBlockHeight == storeBlockHeight {
|
||||
// We ran Commit, but didn't save the state, so replayBlock with mock app
|
||||
abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight)
|
||||
abciResponses, err := sm.LoadABCIResponses(h.stateDB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@@ -3,10 +3,15 @@ package core
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
|
||||
sm "github.com/tendermint/tendermint/state"
|
||||
"github.com/tendermint/tendermint/state/txindex/null"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -338,6 +343,10 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro
|
||||
// BlockResults gets ABCIResults at a given height.
|
||||
// If no height is provided, it will fetch results for the latest block.
|
||||
//
|
||||
// Results for the latest height are fetched from the state database, for
|
||||
// earlier heights - using the indexer. Note: indexer does not store EndBlock
|
||||
// ("end_block": null).
|
||||
//
|
||||
// Results are for the height of the block containing the txs.
|
||||
// Thus response.results[5] is the results of executing getBlock(h).Txs[5]
|
||||
//
|
||||
@@ -380,17 +389,63 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// load the results
|
||||
results, err := sm.LoadABCIResponses(stateDB, height)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// for last height, use state DB
|
||||
if height == storeHeight {
|
||||
results, err := sm.LoadABCIResponses(stateDB)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &ctypes.ResultBlockResults{
|
||||
Height: height,
|
||||
Results: results,
|
||||
}, nil
|
||||
}
|
||||
|
||||
res := &ctypes.ResultBlockResults{
|
||||
Height: height,
|
||||
Results: results,
|
||||
// for other heights, use indexer
|
||||
// if index is disabled, return error
|
||||
if _, ok := txIndexer.(*null.TxIndex); ok {
|
||||
return nil, fmt.Errorf("transaction indexing is disabled")
|
||||
}
|
||||
return res, nil
|
||||
|
||||
var deliverTxs []*abci.ResponseDeliverTx
|
||||
|
||||
// If Tendermint is configured to index txs by height, use it for faster search.
|
||||
if txIndexer.IsTagIndexed(types.TxHeightKey) {
|
||||
results, err := txIndexer.Search(
|
||||
tmquery.MustParse(fmt.Sprintf("%s = %d", types.TxHeightKey, height)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed to search indexer for txs")
|
||||
}
|
||||
deliverTxs = make([]*abci.ResponseDeliverTx, len(results))
|
||||
for i, r := range results {
|
||||
deliverTxs[i] = &r.Result
|
||||
}
|
||||
} else { // otherwise, load the whole block and fetch txs one by one
|
||||
block := blockStore.LoadBlock(height)
|
||||
if block != nil {
|
||||
deliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))
|
||||
|
||||
for i, tx := range block.Data.Txs {
|
||||
r, err := txIndexer.Get(tx.Hash())
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to get Tx (%X) from indexer", tx.Hash())
|
||||
}
|
||||
if r == nil {
|
||||
return nil, fmt.Errorf("Tx (%X) not found", tx.Hash())
|
||||
}
|
||||
deliverTxs[i] = &r.Result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &ctypes.ResultBlockResults{
|
||||
Height: height,
|
||||
Results: &sm.ABCIResponses{
|
||||
DeliverTx: deliverTxs,
|
||||
EndBlock: nil,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func getHeight(currentHeight int64, heightPtr *int64) (int64, error) {
|
||||
|
@@ -1,6 +1,9 @@
|
||||
package state
|
||||
|
||||
import "fmt"
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
type (
|
||||
ErrInvalidBlock error
|
||||
@@ -39,10 +42,10 @@ type (
|
||||
ErrNoConsensusParamsForHeight struct {
|
||||
Height int64
|
||||
}
|
||||
)
|
||||
|
||||
ErrNoABCIResponsesForHeight struct {
|
||||
Height int64
|
||||
}
|
||||
var (
|
||||
ErrNoABCIResponses = errors.New("could not find results")
|
||||
)
|
||||
|
||||
func (e ErrUnknownBlock) Error() string {
|
||||
@@ -71,7 +74,3 @@ func (e ErrNoValSetForHeight) Error() string {
|
||||
func (e ErrNoConsensusParamsForHeight) Error() string {
|
||||
return fmt.Sprintf("Could not find consensus params for height #%d", e.Height)
|
||||
}
|
||||
|
||||
func (e ErrNoABCIResponsesForHeight) Error() string {
|
||||
return fmt.Sprintf("Could not find results for height #%d", e.Height)
|
||||
}
|
||||
|
@@ -126,7 +126,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
|
||||
fail.Fail() // XXX
|
||||
|
||||
// Save the results before we commit.
|
||||
saveABCIResponses(blockExec.db, block.Height, abciResponses)
|
||||
saveABCIResponses(blockExec.db,abciResponses)
|
||||
|
||||
fail.Fail() // XXX
|
||||
|
||||
|
@@ -32,7 +32,7 @@ func TestApplyBlock(t *testing.T) {
|
||||
cc := proxy.NewLocalClientCreator(kvstore.NewKVStoreApplication())
|
||||
proxyApp := proxy.NewAppConns(cc)
|
||||
err := proxyApp.Start()
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
defer proxyApp.Stop()
|
||||
|
||||
state, stateDB := state(1, 1)
|
||||
@@ -45,7 +45,13 @@ func TestApplyBlock(t *testing.T) {
|
||||
|
||||
//nolint:ineffassign
|
||||
state, err = blockExec.ApplyBlock(state, blockID, block)
|
||||
require.Nil(t, err)
|
||||
require.NoError(t, err)
|
||||
|
||||
// test we save results
|
||||
abciResponses, err := LoadABCIResponses(stateDB)
|
||||
if assert.NoError(t, err) {
|
||||
assert.NotNil(t, abciResponses)
|
||||
}
|
||||
|
||||
// TODO check state and mempool
|
||||
}
|
||||
|
@@ -12,12 +12,11 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
abci "github.com/tendermint/tendermint/abci/types"
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/crypto"
|
||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
@@ -99,8 +98,8 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
|
||||
types.TM2PB.NewValidatorUpdate(ed25519.GenPrivKey().PubKey(), 10),
|
||||
}}
|
||||
|
||||
saveABCIResponses(stateDB, block.Height, abciResponses)
|
||||
loadedABCIResponses, err := LoadABCIResponses(stateDB, block.Height)
|
||||
saveABCIResponses(stateDB, abciResponses)
|
||||
loadedABCIResponses, err := LoadABCIResponses(stateDB)
|
||||
assert.Nil(err)
|
||||
assert.Equal(abciResponses, loadedABCIResponses,
|
||||
fmt.Sprintf("ABCIResponses don't match:\ngot: %v\nexpected: %v\n",
|
||||
@@ -111,8 +110,6 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
|
||||
func TestABCIResponsesSaveLoad2(t *testing.T) {
|
||||
tearDown, stateDB, _ := setupTestCase(t)
|
||||
defer tearDown(t)
|
||||
// nolint: vetshadow
|
||||
assert := assert.New(t)
|
||||
|
||||
cases := [...]struct {
|
||||
// Height is implied to equal index+2,
|
||||
@@ -150,29 +147,19 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
// Query all before, this should return error.
|
||||
for i := range cases {
|
||||
h := int64(i + 1)
|
||||
res, err := LoadABCIResponses(stateDB, h)
|
||||
assert.Error(err, "%d: %#v", i, res)
|
||||
}
|
||||
res, err := LoadABCIResponses(stateDB)
|
||||
assert.Error(t, err, "%#v", res)
|
||||
|
||||
// Add all cases.
|
||||
for i, tc := range cases {
|
||||
h := int64(i + 1) // last block height, one below what we save
|
||||
responses := &ABCIResponses{
|
||||
DeliverTx: tc.added,
|
||||
EndBlock: &abci.ResponseEndBlock{},
|
||||
}
|
||||
saveABCIResponses(stateDB, h, responses)
|
||||
}
|
||||
saveABCIResponses(stateDB, responses)
|
||||
|
||||
// Query all before, should return expected value.
|
||||
for i, tc := range cases {
|
||||
h := int64(i + 1)
|
||||
res, err := LoadABCIResponses(stateDB, h)
|
||||
assert.NoError(err, "%d", i)
|
||||
assert.Equal(tc.expected.Hash(), res.ResultsHash(), "%d", i)
|
||||
res, err = LoadABCIResponses(stateDB)
|
||||
assert.NoError(t, err, "%d", i)
|
||||
assert.Equal(t, tc.expected.Hash(), res.ResultsHash(), "%d", i)
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -9,6 +9,10 @@ import (
|
||||
"github.com/tendermint/tendermint/types"
|
||||
)
|
||||
|
||||
var (
|
||||
abciResponsesKey = []byte("abciResponsesKey")
|
||||
)
|
||||
|
||||
//------------------------------------------------------------------------
|
||||
|
||||
func calcValidatorsKey(height int64) []byte {
|
||||
@@ -19,10 +23,6 @@ func calcConsensusParamsKey(height int64) []byte {
|
||||
return []byte(fmt.Sprintf("consensusParamsKey:%v", height))
|
||||
}
|
||||
|
||||
func calcABCIResponsesKey(height int64) []byte {
|
||||
return []byte(fmt.Sprintf("abciResponsesKey:%v", height))
|
||||
}
|
||||
|
||||
// LoadStateFromDBOrGenesisFile loads the most recent state from the database,
|
||||
// or creates a new one from the given genesisFilePath and persists the result
|
||||
// to the database.
|
||||
@@ -137,10 +137,10 @@ func (arz *ABCIResponses) ResultsHash() []byte {
|
||||
// LoadABCIResponses loads the ABCIResponses for the given height from the database.
|
||||
// This is useful for recovering from crashes where we called app.Commit and before we called
|
||||
// s.Save(). It can also be used to produce Merkle proofs of the result of txs.
|
||||
func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
|
||||
buf := db.Get(calcABCIResponsesKey(height))
|
||||
func LoadABCIResponses(db dbm.DB) (*ABCIResponses, error) {
|
||||
buf := db.Get(abciResponsesKey)
|
||||
if len(buf) == 0 {
|
||||
return nil, ErrNoABCIResponsesForHeight{height}
|
||||
return nil, ErrNoABCIResponses
|
||||
}
|
||||
|
||||
abciResponses := new(ABCIResponses)
|
||||
@@ -155,11 +155,10 @@ func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
|
||||
return abciResponses, nil
|
||||
}
|
||||
|
||||
// SaveABCIResponses persists the ABCIResponses to the database.
|
||||
// saveABCIResponses persists the ABCIResponses to the database.
|
||||
// This is useful in case we crash after app.Commit and before s.Save().
|
||||
// Responses are indexed by height so they can also be loaded later to produce Merkle proofs.
|
||||
func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) {
|
||||
db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes())
|
||||
func saveABCIResponses(db dbm.DB, abciResponses *ABCIResponses) {
|
||||
db.SetSync(abciResponsesKey, abciResponses.Bytes())
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
@@ -9,7 +9,6 @@ import (
|
||||
|
||||
// TxIndexer interface defines methods to index and search transactions.
|
||||
type TxIndexer interface {
|
||||
|
||||
// AddBatch analyzes, indexes and stores a batch of transactions.
|
||||
AddBatch(b *Batch) error
|
||||
|
||||
@@ -22,6 +21,9 @@ type TxIndexer interface {
|
||||
|
||||
// Search allows you to query for transactions.
|
||||
Search(q *query.Query) ([]*types.TxResult, error)
|
||||
|
||||
// IsTagIndexed returns true if the tag is being indexed, false - otherwise.
|
||||
IsTagIndexed(tag string) bool
|
||||
}
|
||||
|
||||
//----------------------------------------------------
|
||||
|
@@ -10,9 +10,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
cmn "github.com/tendermint/tendermint/libs/common"
|
||||
dbm "github.com/tendermint/tendermint/libs/db"
|
||||
|
||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||
"github.com/tendermint/tendermint/state/txindex"
|
||||
"github.com/tendermint/tendermint/types"
|
||||
@@ -54,6 +54,11 @@ func IndexAllTags() func(*TxIndex) {
|
||||
}
|
||||
}
|
||||
|
||||
// IsTagIndexed returns true if the tag is being indexed, false - otherwise.
|
||||
func (txi *TxIndex) IsTagIndexed(tag string) bool {
|
||||
return txi.indexAllTags || cmn.StringInSlice(tag, txi.tagsToIndex)
|
||||
}
|
||||
|
||||
// Get gets transaction from the TxIndex storage and returns it or nil if the
|
||||
// transaction is not found.
|
||||
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
|
||||
|
@@ -13,6 +13,11 @@ var _ txindex.TxIndexer = (*TxIndex)(nil)
|
||||
// TxIndex acts as a /dev/null.
|
||||
type TxIndex struct{}
|
||||
|
||||
// IsTagIndexed always returns false.
|
||||
func (txi *TxIndex) IsTagIndexed(tag string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Get on a TxIndex is disabled and panics when invoked.
|
||||
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
|
||||
return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`)
|
||||
|
Reference in New Issue
Block a user