mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-13 21:31:23 +00:00
state: ABCIResponses, s.Save() in ApplyBlock
This commit is contained in:
@ -251,7 +251,6 @@ FOR_LOOP:
|
|||||||
// TODO This is bad, are we zombie?
|
// TODO This is bad, are we zombie?
|
||||||
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
|
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
|
||||||
}
|
}
|
||||||
bcR.state.Save()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue FOR_LOOP
|
continue FOR_LOOP
|
||||||
|
@ -1221,7 +1221,7 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
|||||||
stateCopy := cs.state.Copy()
|
stateCopy := cs.state.Copy()
|
||||||
eventCache := types.NewEventCache(cs.evsw)
|
eventCache := types.NewEventCache(cs.evsw)
|
||||||
|
|
||||||
// Execute and commit the block, and update the mempool.
|
// Execute and commit the block, update and save the state, and update the mempool.
|
||||||
// All calls to the proxyAppConn should come here.
|
// All calls to the proxyAppConn should come here.
|
||||||
// NOTE: the block.AppHash wont reflect these txs until the next block
|
// NOTE: the block.AppHash wont reflect these txs until the next block
|
||||||
err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
|
err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
|
||||||
@ -1233,14 +1233,10 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
|||||||
fail.Fail() // XXX
|
fail.Fail() // XXX
|
||||||
|
|
||||||
// Fire off event for new block.
|
// Fire off event for new block.
|
||||||
// TODO: Handle app failure. See #177
|
|
||||||
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
|
types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block})
|
||||||
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
|
types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header})
|
||||||
eventCache.Flush()
|
eventCache.Flush()
|
||||||
|
|
||||||
// Save the state.
|
|
||||||
stateCopy.Save()
|
|
||||||
|
|
||||||
fail.Fail() // XXX
|
fail.Fail() // XXX
|
||||||
|
|
||||||
// NewHeightStep!
|
// NewHeightStep!
|
||||||
|
@ -13,59 +13,39 @@ import (
|
|||||||
"github.com/tendermint/tendermint/types"
|
"github.com/tendermint/tendermint/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ExecBlock executes the block to mutate State.
|
//--------------------------------------------------
|
||||||
|
// Execute the block
|
||||||
|
|
||||||
|
// ExecBlock executes the block, but does NOT mutate State.
|
||||||
// + validates the block
|
// + validates the block
|
||||||
// + executes block.Txs on the proxyAppConn
|
// + executes block.Txs on the proxyAppConn
|
||||||
// + updates validator sets
|
func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
|
||||||
// + returns block.Txs results
|
|
||||||
func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) ([]*types.TxResult, error) {
|
|
||||||
// Validate the block.
|
// Validate the block.
|
||||||
if err := s.validateBlock(block); err != nil {
|
if err := s.validateBlock(block); err != nil {
|
||||||
return nil, ErrInvalidBlock(err)
|
return nil, ErrInvalidBlock(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// compute bitarray of validators that signed
|
|
||||||
signed := commitBitArrayFromBlock(block)
|
|
||||||
_ = signed // TODO send on begin block
|
|
||||||
|
|
||||||
// copy the valset
|
|
||||||
valSet := s.Validators.Copy()
|
|
||||||
nextValSet := valSet.Copy()
|
|
||||||
|
|
||||||
// Execute the block txs
|
// Execute the block txs
|
||||||
txResults, changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block)
|
abciResponses, err := execBlockOnProxyApp(eventCache, proxyAppConn, block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// There was some error in proxyApp
|
// There was some error in proxyApp
|
||||||
// TODO Report error and wait for proxyApp to be available.
|
// TODO Report error and wait for proxyApp to be available.
|
||||||
return nil, ErrProxyAppConn(err)
|
return nil, ErrProxyAppConn(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// update the validator set
|
return abciResponses, nil
|
||||||
err = updateValidators(nextValSet, changedValidators)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn("Error changing validator set", "error", err)
|
|
||||||
// TODO: err or carry on?
|
|
||||||
}
|
|
||||||
|
|
||||||
// All good!
|
|
||||||
// Update validator accums and set state variables
|
|
||||||
nextValSet.IncrementAccum(1)
|
|
||||||
s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet)
|
|
||||||
|
|
||||||
fail.Fail() // XXX
|
|
||||||
|
|
||||||
return txResults, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Executes block's transactions on proxyAppConn.
|
// Executes block's transactions on proxyAppConn.
|
||||||
// Returns a list of transaction results and updates to the validator set
|
// Returns a list of transaction results and updates to the validator set
|
||||||
// TODO: Generate a bitmap or otherwise store tx validity in state.
|
// TODO: Generate a bitmap or otherwise store tx validity in state.
|
||||||
func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*types.TxResult, []*abci.Validator, error) {
|
func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
|
||||||
var validTxs, invalidTxs = 0, 0
|
var validTxs, invalidTxs = 0, 0
|
||||||
|
|
||||||
txResults := make([]*types.TxResult, len(block.Txs))
|
|
||||||
txIndex := 0
|
txIndex := 0
|
||||||
|
|
||||||
|
abciResponses := NewABCIResponses(block)
|
||||||
|
|
||||||
// Execute transactions and get hash
|
// Execute transactions and get hash
|
||||||
proxyCb := func(req *abci.Request, res *abci.Response) {
|
proxyCb := func(req *abci.Request, res *abci.Response) {
|
||||||
switch r := res.Value.(type) {
|
switch r := res.Value.(type) {
|
||||||
@ -84,12 +64,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
|
|||||||
txError = txResult.Code.String()
|
txError = txResult.Code.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
txResults[txIndex] = &types.TxResult{
|
abciResponses.TxResults[txIndex] = &types.TxResult{uint64(block.Height), uint32(txIndex), *txResult}
|
||||||
Height: uint64(block.Height),
|
|
||||||
Index: uint32(txIndex),
|
|
||||||
Tx: req.GetDeliverTx().Tx,
|
|
||||||
Result: *txResult,
|
|
||||||
}
|
|
||||||
txIndex++
|
txIndex++
|
||||||
|
|
||||||
// NOTE: if we count we can access the tx from the block instead of
|
// NOTE: if we count we can access the tx from the block instead of
|
||||||
@ -111,7 +86,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
|
|||||||
err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header))
|
err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error in proxyAppConn.BeginBlock", "error", err)
|
log.Warn("Error in proxyAppConn.BeginBlock", "error", err)
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fail.Fail() // XXX
|
fail.Fail() // XXX
|
||||||
@ -121,7 +96,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
|
|||||||
fail.FailRand(len(block.Txs)) // XXX
|
fail.FailRand(len(block.Txs)) // XXX
|
||||||
proxyAppConn.DeliverTxAsync(tx)
|
proxyAppConn.DeliverTxAsync(tx)
|
||||||
if err := proxyAppConn.Error(); err != nil {
|
if err := proxyAppConn.Error(); err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -131,7 +106,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
|
|||||||
respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height))
|
respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error in proxyAppConn.EndBlock", "error", err)
|
log.Warn("Error in proxyAppConn.EndBlock", "error", err)
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
fail.Fail() // XXX
|
fail.Fail() // XXX
|
||||||
@ -140,7 +115,9 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo
|
|||||||
if len(respEndBlock.Diffs) > 0 {
|
if len(respEndBlock.Diffs) > 0 {
|
||||||
log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs))
|
log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs))
|
||||||
}
|
}
|
||||||
return txResults, respEndBlock.Diffs, nil
|
abciResponses.Validators = respEndBlock.Diffs
|
||||||
|
|
||||||
|
return abciResponses, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error {
|
func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error {
|
||||||
@ -230,16 +207,30 @@ func (s *State) validateBlock(block *types.Block) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyBlock executes the block, then commits and updates the mempool
|
//-----------------------------------------------------------------------------
|
||||||
// atomically, optionally indexing transaction results.
|
// ApplyBlock executes the block, updates state w/ ABCI responses,
|
||||||
|
// then commits and updates the mempool atomically, then saves state.
|
||||||
|
// Transaction results are optionally indexed.
|
||||||
|
|
||||||
|
// Execute and commit block against app, save block and state
|
||||||
func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus,
|
func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus,
|
||||||
block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error {
|
block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error {
|
||||||
|
|
||||||
txResults, err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader)
|
abciResponses, err := s.ExecBlock(eventCache, proxyAppConn, block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("Exec failed for application: %v", err)
|
return fmt.Errorf("Exec failed for application: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fail.Fail() // XXX
|
||||||
|
|
||||||
|
// save the results before we commit
|
||||||
|
s.SaveABCIResponses(abciResponses)
|
||||||
|
|
||||||
|
fail.Fail() // XXX
|
||||||
|
|
||||||
|
// now update the block and validators
|
||||||
|
s.SetBlockAndValidators(block.Header, partsHeader)
|
||||||
|
|
||||||
// lock mempool, commit state, update mempoool
|
// lock mempool, commit state, update mempoool
|
||||||
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
|
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -252,6 +243,11 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn
|
|||||||
}
|
}
|
||||||
s.TxIndexer.AddBatch(batch)
|
s.TxIndexer.AddBatch(batch)
|
||||||
|
|
||||||
|
fail.Fail() // XXX
|
||||||
|
|
||||||
|
// save the state
|
||||||
|
s.Save()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -284,9 +280,10 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
|
|||||||
|
|
||||||
// Apply and commit a block, but without all the state validation.
|
// Apply and commit a block, but without all the state validation.
|
||||||
// Returns the application root hash (result of abci.Commit)
|
// Returns the application root hash (result of abci.Commit)
|
||||||
|
// TODO handle abciResponses
|
||||||
func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
|
func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) {
|
||||||
var eventCache types.Fireable // nil
|
var eventCache types.Fireable // nil
|
||||||
_, _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
|
_, err := execBlockOnProxyApp(eventCache, appConnConsensus, block)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("Error executing block on proxy app", "height", block.Height, "err", err)
|
log.Warn("Error executing block on proxy app", "height", block.Height, "err", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
abci "github.com/tendermint/abci/types"
|
||||||
. "github.com/tendermint/go-common"
|
. "github.com/tendermint/go-common"
|
||||||
cfg "github.com/tendermint/go-config"
|
cfg "github.com/tendermint/go-config"
|
||||||
dbm "github.com/tendermint/go-db"
|
dbm "github.com/tendermint/go-db"
|
||||||
@ -16,7 +17,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
stateKey = []byte("stateKey")
|
stateKey = []byte("stateKey")
|
||||||
|
abciResponsesKey = []byte("abciResponsesKey")
|
||||||
)
|
)
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
@ -31,7 +33,7 @@ type State struct {
|
|||||||
GenesisDoc *types.GenesisDoc
|
GenesisDoc *types.GenesisDoc
|
||||||
ChainID string
|
ChainID string
|
||||||
|
|
||||||
// updated at end of ExecBlock
|
// updated at end of SetBlockAndValidators
|
||||||
LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist.
|
LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist.
|
||||||
LastBlockID types.BlockID
|
LastBlockID types.BlockID
|
||||||
LastBlockTime time.Time
|
LastBlockTime time.Time
|
||||||
@ -42,6 +44,10 @@ type State struct {
|
|||||||
AppHash []byte
|
AppHash []byte
|
||||||
|
|
||||||
TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer.
|
TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer.
|
||||||
|
|
||||||
|
// Intermediate results from processing
|
||||||
|
// Persisted separately from the state
|
||||||
|
abciResponses *ABCIResponses
|
||||||
}
|
}
|
||||||
|
|
||||||
func LoadState(db dbm.DB) *State {
|
func LoadState(db dbm.DB) *State {
|
||||||
@ -62,6 +68,8 @@ func loadState(db dbm.DB, key []byte) *State {
|
|||||||
}
|
}
|
||||||
// TODO: ensure that buf is completely read.
|
// TODO: ensure that buf is completely read.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
s.LoadABCIResponses()
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -76,7 +84,8 @@ func (s *State) Copy() *State {
|
|||||||
Validators: s.Validators.Copy(),
|
Validators: s.Validators.Copy(),
|
||||||
LastValidators: s.LastValidators.Copy(),
|
LastValidators: s.LastValidators.Copy(),
|
||||||
AppHash: s.AppHash,
|
AppHash: s.AppHash,
|
||||||
TxIndexer: s.TxIndexer, // pointer here, not value
|
abciResponses: s.abciResponses, // pointer here, not value
|
||||||
|
TxIndexer: s.TxIndexer, // pointer here, not value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,6 +95,37 @@ func (s *State) Save() {
|
|||||||
s.db.SetSync(stateKey, s.Bytes())
|
s.db.SetSync(stateKey, s.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sets the ABCIResponses in the state and writes them to disk
|
||||||
|
func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) {
|
||||||
|
s.abciResponses = abciResponses
|
||||||
|
|
||||||
|
// save the validators to the db
|
||||||
|
s.db.SetSync(abciResponsesKey, s.abciResponses.Bytes())
|
||||||
|
|
||||||
|
// save the tx results using the TxIndexer
|
||||||
|
batch := txindexer.NewBatch()
|
||||||
|
for i, r := range s.abciResponses.TxResults {
|
||||||
|
tx := s.abciResponses.Txs[i]
|
||||||
|
batch.Index(tx.Hash(), *r)
|
||||||
|
}
|
||||||
|
s.TxIndexer.Batch(batch)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *State) LoadABCIResponses() {
|
||||||
|
s.abciResponses = new(ABCIResponses)
|
||||||
|
|
||||||
|
buf := s.db.Get(abciResponsesKey)
|
||||||
|
if len(buf) != 0 {
|
||||||
|
r, n, err := bytes.NewReader(buf), new(int), new(error)
|
||||||
|
wire.ReadBinaryPtr(&s.abciResponses.Validators, r, 0, n, err)
|
||||||
|
if *err != nil {
|
||||||
|
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
|
||||||
|
Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err))
|
||||||
|
}
|
||||||
|
// TODO: ensure that buf is completely read.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *State) Equals(s2 *State) bool {
|
func (s *State) Equals(s2 *State) bool {
|
||||||
return bytes.Equal(s.Bytes(), s2.Bytes())
|
return bytes.Equal(s.Bytes(), s2.Bytes())
|
||||||
}
|
}
|
||||||
@ -101,7 +141,21 @@ func (s *State) Bytes() []byte {
|
|||||||
|
|
||||||
// Mutate state variables to match block and validators
|
// Mutate state variables to match block and validators
|
||||||
// after running EndBlock
|
// after running EndBlock
|
||||||
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) {
|
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader) {
|
||||||
|
|
||||||
|
// copy the valset
|
||||||
|
prevValSet := s.Validators.Copy()
|
||||||
|
nextValSet := prevValSet.Copy()
|
||||||
|
|
||||||
|
// update the validator set
|
||||||
|
err := updateValidators(nextValSet, s.abciResponses.Validators)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn("Error changing validator set", "error", err)
|
||||||
|
// TODO: err or carry on?
|
||||||
|
}
|
||||||
|
// Update validator accums and set state variables
|
||||||
|
nextValSet.IncrementAccum(1)
|
||||||
|
|
||||||
s.setBlockAndValidators(header.Height,
|
s.setBlockAndValidators(header.Height,
|
||||||
types.BlockID{header.Hash(), blockPartsHeader}, header.Time,
|
types.BlockID{header.Hash(), blockPartsHeader}, header.Time,
|
||||||
prevValSet, nextValSet)
|
prevValSet, nextValSet)
|
||||||
@ -134,6 +188,33 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State {
|
|||||||
return state
|
return state
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//--------------------------------------------------
|
||||||
|
// ABCIResponses holds intermediate state during block processing
|
||||||
|
|
||||||
|
type ABCIResponses struct {
|
||||||
|
Validators []*abci.Validator // changes to the validator set
|
||||||
|
|
||||||
|
Txs types.Txs // for reference later
|
||||||
|
TxResults []*types.TxResult // results of the txs, populated in the proxyCb
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewABCIResponses(block *types.Block) *ABCIResponses {
|
||||||
|
return &ABCIResponses{
|
||||||
|
Txs: block.Data.Txs,
|
||||||
|
TxResults: make([]*types.TxResult, block.NumTxs),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Serialize the list of validators
|
||||||
|
func (a *ABCIResponses) Bytes() []byte {
|
||||||
|
buf, n, err := new(bytes.Buffer), new(int), new(error)
|
||||||
|
wire.WriteBinary(a.Validators, buf, n, err)
|
||||||
|
if *err != nil {
|
||||||
|
PanicCrisis(*err)
|
||||||
|
}
|
||||||
|
return buf.Bytes()
|
||||||
|
}
|
||||||
|
|
||||||
//-----------------------------------------------------------------------------
|
//-----------------------------------------------------------------------------
|
||||||
// Genesis
|
// Genesis
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user