mirror of
https://github.com/fluencelabs/tendermint
synced 2025-08-01 04:31:57 +00:00
Compare commits
8 Commits
jae/verify
...
2491-delet
Author | SHA1 | Date | |
---|---|---|---|
|
ecf1d3980a | ||
|
ca70680728 | ||
|
be1b759d2f | ||
|
9161d9fd14 | ||
|
45c6f2fec7 | ||
|
0ff6526975 | ||
|
5afe5c4af5 | ||
|
b1fbfebc79 |
@@ -5,6 +5,7 @@
|
|||||||
### BREAKING CHANGES:
|
### BREAKING CHANGES:
|
||||||
|
|
||||||
* CLI/RPC/Config
|
* CLI/RPC/Config
|
||||||
|
* [rpc] `/block_results` now uses indexer to fetch block results for heights < `latestBlockHeight`
|
||||||
|
|
||||||
* Apps
|
* Apps
|
||||||
|
|
||||||
@@ -24,5 +25,5 @@
|
|||||||
- [mempool] \#2778 No longer send txs back to peers who sent it to you
|
- [mempool] \#2778 No longer send txs back to peers who sent it to you
|
||||||
|
|
||||||
### BUG FIXES:
|
### BUG FIXES:
|
||||||
|
- [state] \#2491 Store ABCIResponses only for the last block
|
||||||
- [blockchain] \#2699 update the maxHeight when a peer is removed
|
- [blockchain] \#2699 update the maxHeight when a peer is removed
|
||||||
|
@@ -381,7 +381,7 @@ func (h *Handshaker) ReplayBlocks(
|
|||||||
|
|
||||||
} else if appBlockHeight == storeBlockHeight {
|
} else if appBlockHeight == storeBlockHeight {
|
||||||
// We ran Commit, but didn't save the state, so replayBlock with mock app
|
// We ran Commit, but didn't save the state, so replayBlock with mock app
|
||||||
abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight)
|
abciResponses, err := sm.LoadABCIResponses(h.stateDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@@ -3,10 +3,15 @@ package core
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
|
abci "github.com/tendermint/tendermint/abci/types"
|
||||||
cmn "github.com/tendermint/tendermint/libs/common"
|
cmn "github.com/tendermint/tendermint/libs/common"
|
||||||
|
tmquery "github.com/tendermint/tendermint/libs/pubsub/query"
|
||||||
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
ctypes "github.com/tendermint/tendermint/rpc/core/types"
|
||||||
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
|
rpctypes "github.com/tendermint/tendermint/rpc/lib/types"
|
||||||
sm "github.com/tendermint/tendermint/state"
|
sm "github.com/tendermint/tendermint/state"
|
||||||
|
"github.com/tendermint/tendermint/state/txindex/null"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -338,6 +343,10 @@ func Commit(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultCommit, erro
|
|||||||
// BlockResults gets ABCIResults at a given height.
|
// BlockResults gets ABCIResults at a given height.
|
||||||
// If no height is provided, it will fetch results for the latest block.
|
// If no height is provided, it will fetch results for the latest block.
|
||||||
//
|
//
|
||||||
|
// Results for the latest height are fetched from the state database, for
|
||||||
|
// earlier heights - using the indexer. Note: indexer does not store EndBlock
|
||||||
|
// ("end_block": null).
|
||||||
|
//
|
||||||
// Results are for the height of the block containing the txs.
|
// Results are for the height of the block containing the txs.
|
||||||
// Thus response.results[5] is the results of executing getBlock(h).Txs[5]
|
// Thus response.results[5] is the results of executing getBlock(h).Txs[5]
|
||||||
//
|
//
|
||||||
@@ -380,17 +389,63 @@ func BlockResults(ctx *rpctypes.Context, heightPtr *int64) (*ctypes.ResultBlockR
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// load the results
|
// for last height, use state DB
|
||||||
results, err := sm.LoadABCIResponses(stateDB, height)
|
if height == storeHeight {
|
||||||
if err != nil {
|
results, err := sm.LoadABCIResponses(stateDB)
|
||||||
return nil, err
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &ctypes.ResultBlockResults{
|
||||||
|
Height: height,
|
||||||
|
Results: results,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
res := &ctypes.ResultBlockResults{
|
// for other heights, use indexer
|
||||||
Height: height,
|
// if index is disabled, return error
|
||||||
Results: results,
|
if _, ok := txIndexer.(*null.TxIndex); ok {
|
||||||
|
return nil, fmt.Errorf("transaction indexing is disabled")
|
||||||
}
|
}
|
||||||
return res, nil
|
|
||||||
|
var deliverTxs []*abci.ResponseDeliverTx
|
||||||
|
|
||||||
|
// If Tendermint is configured to index txs by height, use it for faster search.
|
||||||
|
if txIndexer.IsTagIndexed(types.TxHeightKey) {
|
||||||
|
results, err := txIndexer.Search(
|
||||||
|
tmquery.MustParse(fmt.Sprintf("%s = %d", types.TxHeightKey, height)),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrap(err, "failed to search indexer for txs")
|
||||||
|
}
|
||||||
|
deliverTxs = make([]*abci.ResponseDeliverTx, len(results))
|
||||||
|
for i, r := range results {
|
||||||
|
deliverTxs[i] = &r.Result
|
||||||
|
}
|
||||||
|
} else { // otherwise, load the whole block and fetch txs one by one
|
||||||
|
block := blockStore.LoadBlock(height)
|
||||||
|
if block != nil {
|
||||||
|
deliverTxs = make([]*abci.ResponseDeliverTx, len(block.Data.Txs))
|
||||||
|
|
||||||
|
for i, tx := range block.Data.Txs {
|
||||||
|
r, err := txIndexer.Get(tx.Hash())
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(err, "failed to get Tx (%X) from indexer", tx.Hash())
|
||||||
|
}
|
||||||
|
if r == nil {
|
||||||
|
return nil, fmt.Errorf("Tx (%X) not found", tx.Hash())
|
||||||
|
}
|
||||||
|
deliverTxs[i] = &r.Result
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ctypes.ResultBlockResults{
|
||||||
|
Height: height,
|
||||||
|
Results: &sm.ABCIResponses{
|
||||||
|
DeliverTx: deliverTxs,
|
||||||
|
EndBlock: nil,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getHeight(currentHeight int64, heightPtr *int64) (int64, error) {
|
func getHeight(currentHeight int64, heightPtr *int64) (int64, error) {
|
||||||
|
@@ -1,6 +1,9 @@
|
|||||||
package state
|
package state
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
ErrInvalidBlock error
|
ErrInvalidBlock error
|
||||||
@@ -39,10 +42,10 @@ type (
|
|||||||
ErrNoConsensusParamsForHeight struct {
|
ErrNoConsensusParamsForHeight struct {
|
||||||
Height int64
|
Height int64
|
||||||
}
|
}
|
||||||
|
)
|
||||||
|
|
||||||
ErrNoABCIResponsesForHeight struct {
|
var (
|
||||||
Height int64
|
ErrNoABCIResponses = errors.New("could not find results")
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (e ErrUnknownBlock) Error() string {
|
func (e ErrUnknownBlock) Error() string {
|
||||||
@@ -71,7 +74,3 @@ func (e ErrNoValSetForHeight) Error() string {
|
|||||||
func (e ErrNoConsensusParamsForHeight) Error() string {
|
func (e ErrNoConsensusParamsForHeight) Error() string {
|
||||||
return fmt.Sprintf("Could not find consensus params for height #%d", e.Height)
|
return fmt.Sprintf("Could not find consensus params for height #%d", e.Height)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e ErrNoABCIResponsesForHeight) Error() string {
|
|
||||||
return fmt.Sprintf("Could not find results for height #%d", e.Height)
|
|
||||||
}
|
|
||||||
|
@@ -126,7 +126,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
|
|||||||
fail.Fail() // XXX
|
fail.Fail() // XXX
|
||||||
|
|
||||||
// Save the results before we commit.
|
// Save the results before we commit.
|
||||||
saveABCIResponses(blockExec.db, block.Height, abciResponses)
|
saveABCIResponses(blockExec.db,abciResponses)
|
||||||
|
|
||||||
fail.Fail() // XXX
|
fail.Fail() // XXX
|
||||||
|
|
||||||
|
@@ -32,7 +32,7 @@ func TestApplyBlock(t *testing.T) {
|
|||||||
cc := proxy.NewLocalClientCreator(kvstore.NewKVStoreApplication())
|
cc := proxy.NewLocalClientCreator(kvstore.NewKVStoreApplication())
|
||||||
proxyApp := proxy.NewAppConns(cc)
|
proxyApp := proxy.NewAppConns(cc)
|
||||||
err := proxyApp.Start()
|
err := proxyApp.Start()
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
defer proxyApp.Stop()
|
defer proxyApp.Stop()
|
||||||
|
|
||||||
state, stateDB := state(1, 1)
|
state, stateDB := state(1, 1)
|
||||||
@@ -45,7 +45,13 @@ func TestApplyBlock(t *testing.T) {
|
|||||||
|
|
||||||
//nolint:ineffassign
|
//nolint:ineffassign
|
||||||
state, err = blockExec.ApplyBlock(state, blockID, block)
|
state, err = blockExec.ApplyBlock(state, blockID, block)
|
||||||
require.Nil(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
// test we save results
|
||||||
|
abciResponses, err := LoadABCIResponses(stateDB)
|
||||||
|
if assert.NoError(t, err) {
|
||||||
|
assert.NotNil(t, abciResponses)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO check state and mempool
|
// TODO check state and mempool
|
||||||
}
|
}
|
||||||
|
@@ -12,12 +12,11 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
abci "github.com/tendermint/tendermint/abci/types"
|
abci "github.com/tendermint/tendermint/abci/types"
|
||||||
|
cfg "github.com/tendermint/tendermint/config"
|
||||||
"github.com/tendermint/tendermint/crypto"
|
"github.com/tendermint/tendermint/crypto"
|
||||||
"github.com/tendermint/tendermint/crypto/ed25519"
|
"github.com/tendermint/tendermint/crypto/ed25519"
|
||||||
cmn "github.com/tendermint/tendermint/libs/common"
|
cmn "github.com/tendermint/tendermint/libs/common"
|
||||||
dbm "github.com/tendermint/tendermint/libs/db"
|
dbm "github.com/tendermint/tendermint/libs/db"
|
||||||
|
|
||||||
cfg "github.com/tendermint/tendermint/config"
|
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -99,8 +98,8 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
|
|||||||
types.TM2PB.NewValidatorUpdate(ed25519.GenPrivKey().PubKey(), 10),
|
types.TM2PB.NewValidatorUpdate(ed25519.GenPrivKey().PubKey(), 10),
|
||||||
}}
|
}}
|
||||||
|
|
||||||
saveABCIResponses(stateDB, block.Height, abciResponses)
|
saveABCIResponses(stateDB, abciResponses)
|
||||||
loadedABCIResponses, err := LoadABCIResponses(stateDB, block.Height)
|
loadedABCIResponses, err := LoadABCIResponses(stateDB)
|
||||||
assert.Nil(err)
|
assert.Nil(err)
|
||||||
assert.Equal(abciResponses, loadedABCIResponses,
|
assert.Equal(abciResponses, loadedABCIResponses,
|
||||||
fmt.Sprintf("ABCIResponses don't match:\ngot: %v\nexpected: %v\n",
|
fmt.Sprintf("ABCIResponses don't match:\ngot: %v\nexpected: %v\n",
|
||||||
@@ -111,8 +110,6 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
|
|||||||
func TestABCIResponsesSaveLoad2(t *testing.T) {
|
func TestABCIResponsesSaveLoad2(t *testing.T) {
|
||||||
tearDown, stateDB, _ := setupTestCase(t)
|
tearDown, stateDB, _ := setupTestCase(t)
|
||||||
defer tearDown(t)
|
defer tearDown(t)
|
||||||
// nolint: vetshadow
|
|
||||||
assert := assert.New(t)
|
|
||||||
|
|
||||||
cases := [...]struct {
|
cases := [...]struct {
|
||||||
// Height is implied to equal index+2,
|
// Height is implied to equal index+2,
|
||||||
@@ -150,29 +147,19 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query all before, this should return error.
|
res, err := LoadABCIResponses(stateDB)
|
||||||
for i := range cases {
|
assert.Error(t, err, "%#v", res)
|
||||||
h := int64(i + 1)
|
|
||||||
res, err := LoadABCIResponses(stateDB, h)
|
|
||||||
assert.Error(err, "%d: %#v", i, res)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add all cases.
|
|
||||||
for i, tc := range cases {
|
for i, tc := range cases {
|
||||||
h := int64(i + 1) // last block height, one below what we save
|
|
||||||
responses := &ABCIResponses{
|
responses := &ABCIResponses{
|
||||||
DeliverTx: tc.added,
|
DeliverTx: tc.added,
|
||||||
EndBlock: &abci.ResponseEndBlock{},
|
EndBlock: &abci.ResponseEndBlock{},
|
||||||
}
|
}
|
||||||
saveABCIResponses(stateDB, h, responses)
|
saveABCIResponses(stateDB, responses)
|
||||||
}
|
|
||||||
|
|
||||||
// Query all before, should return expected value.
|
res, err = LoadABCIResponses(stateDB)
|
||||||
for i, tc := range cases {
|
assert.NoError(t, err, "%d", i)
|
||||||
h := int64(i + 1)
|
assert.Equal(t, tc.expected.Hash(), res.ResultsHash(), "%d", i)
|
||||||
res, err := LoadABCIResponses(stateDB, h)
|
|
||||||
assert.NoError(err, "%d", i)
|
|
||||||
assert.Equal(tc.expected.Hash(), res.ResultsHash(), "%d", i)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -9,6 +9,10 @@ import (
|
|||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
abciResponsesKey = []byte("abciResponsesKey")
|
||||||
|
)
|
||||||
|
|
||||||
//------------------------------------------------------------------------
|
//------------------------------------------------------------------------
|
||||||
|
|
||||||
func calcValidatorsKey(height int64) []byte {
|
func calcValidatorsKey(height int64) []byte {
|
||||||
@@ -19,10 +23,6 @@ func calcConsensusParamsKey(height int64) []byte {
|
|||||||
return []byte(fmt.Sprintf("consensusParamsKey:%v", height))
|
return []byte(fmt.Sprintf("consensusParamsKey:%v", height))
|
||||||
}
|
}
|
||||||
|
|
||||||
func calcABCIResponsesKey(height int64) []byte {
|
|
||||||
return []byte(fmt.Sprintf("abciResponsesKey:%v", height))
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadStateFromDBOrGenesisFile loads the most recent state from the database,
|
// LoadStateFromDBOrGenesisFile loads the most recent state from the database,
|
||||||
// or creates a new one from the given genesisFilePath and persists the result
|
// or creates a new one from the given genesisFilePath and persists the result
|
||||||
// to the database.
|
// to the database.
|
||||||
@@ -137,10 +137,10 @@ func (arz *ABCIResponses) ResultsHash() []byte {
|
|||||||
// LoadABCIResponses loads the ABCIResponses for the given height from the database.
|
// LoadABCIResponses loads the ABCIResponses for the given height from the database.
|
||||||
// This is useful for recovering from crashes where we called app.Commit and before we called
|
// This is useful for recovering from crashes where we called app.Commit and before we called
|
||||||
// s.Save(). It can also be used to produce Merkle proofs of the result of txs.
|
// s.Save(). It can also be used to produce Merkle proofs of the result of txs.
|
||||||
func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
|
func LoadABCIResponses(db dbm.DB) (*ABCIResponses, error) {
|
||||||
buf := db.Get(calcABCIResponsesKey(height))
|
buf := db.Get(abciResponsesKey)
|
||||||
if len(buf) == 0 {
|
if len(buf) == 0 {
|
||||||
return nil, ErrNoABCIResponsesForHeight{height}
|
return nil, ErrNoABCIResponses
|
||||||
}
|
}
|
||||||
|
|
||||||
abciResponses := new(ABCIResponses)
|
abciResponses := new(ABCIResponses)
|
||||||
@@ -155,11 +155,10 @@ func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
|
|||||||
return abciResponses, nil
|
return abciResponses, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveABCIResponses persists the ABCIResponses to the database.
|
// saveABCIResponses persists the ABCIResponses to the database.
|
||||||
// This is useful in case we crash after app.Commit and before s.Save().
|
// This is useful in case we crash after app.Commit and before s.Save().
|
||||||
// Responses are indexed by height so they can also be loaded later to produce Merkle proofs.
|
func saveABCIResponses(db dbm.DB, abciResponses *ABCIResponses) {
|
||||||
func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) {
|
db.SetSync(abciResponsesKey, abciResponses.Bytes())
|
||||||
db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
|
@@ -9,7 +9,6 @@ import (
|
|||||||
|
|
||||||
// TxIndexer interface defines methods to index and search transactions.
|
// TxIndexer interface defines methods to index and search transactions.
|
||||||
type TxIndexer interface {
|
type TxIndexer interface {
|
||||||
|
|
||||||
// AddBatch analyzes, indexes and stores a batch of transactions.
|
// AddBatch analyzes, indexes and stores a batch of transactions.
|
||||||
AddBatch(b *Batch) error
|
AddBatch(b *Batch) error
|
||||||
|
|
||||||
@@ -22,6 +21,9 @@ type TxIndexer interface {
|
|||||||
|
|
||||||
// Search allows you to query for transactions.
|
// Search allows you to query for transactions.
|
||||||
Search(q *query.Query) ([]*types.TxResult, error)
|
Search(q *query.Query) ([]*types.TxResult, error)
|
||||||
|
|
||||||
|
// IsTagIndexed returns true if the tag is being indexed, false - otherwise.
|
||||||
|
IsTagIndexed(tag string) bool
|
||||||
}
|
}
|
||||||
|
|
||||||
//----------------------------------------------------
|
//----------------------------------------------------
|
||||||
|
@@ -10,9 +10,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
|
|
||||||
cmn "github.com/tendermint/tendermint/libs/common"
|
cmn "github.com/tendermint/tendermint/libs/common"
|
||||||
dbm "github.com/tendermint/tendermint/libs/db"
|
dbm "github.com/tendermint/tendermint/libs/db"
|
||||||
|
|
||||||
"github.com/tendermint/tendermint/libs/pubsub/query"
|
"github.com/tendermint/tendermint/libs/pubsub/query"
|
||||||
"github.com/tendermint/tendermint/state/txindex"
|
"github.com/tendermint/tendermint/state/txindex"
|
||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
@@ -54,6 +54,11 @@ func IndexAllTags() func(*TxIndex) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsTagIndexed returns true if the tag is being indexed, false - otherwise.
|
||||||
|
func (txi *TxIndex) IsTagIndexed(tag string) bool {
|
||||||
|
return txi.indexAllTags || cmn.StringInSlice(tag, txi.tagsToIndex)
|
||||||
|
}
|
||||||
|
|
||||||
// Get gets transaction from the TxIndex storage and returns it or nil if the
|
// Get gets transaction from the TxIndex storage and returns it or nil if the
|
||||||
// transaction is not found.
|
// transaction is not found.
|
||||||
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
|
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
|
||||||
|
@@ -13,6 +13,11 @@ var _ txindex.TxIndexer = (*TxIndex)(nil)
|
|||||||
// TxIndex acts as a /dev/null.
|
// TxIndex acts as a /dev/null.
|
||||||
type TxIndex struct{}
|
type TxIndex struct{}
|
||||||
|
|
||||||
|
// IsTagIndexed always returns false.
|
||||||
|
func (txi *TxIndex) IsTagIndexed(tag string) bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
// Get on a TxIndex is disabled and panics when invoked.
|
// Get on a TxIndex is disabled and panics when invoked.
|
||||||
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
|
func (txi *TxIndex) Get(hash []byte) (*types.TxResult, error) {
|
||||||
return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`)
|
return nil, errors.New(`Indexing is disabled (set 'tx_index = "kv"' in config)`)
|
||||||
|
Reference in New Issue
Block a user