diff --git a/blockchain/reactor.go b/blockchain/reactor.go index 90258825..f88bccc3 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -251,7 +251,6 @@ FOR_LOOP: // TODO This is bad, are we zombie? cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) } - bcR.state.Save() } } continue FOR_LOOP diff --git a/consensus/state.go b/consensus/state.go index d6943581..fdbf4309 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -1221,7 +1221,7 @@ func (cs *ConsensusState) finalizeCommit(height int) { stateCopy := cs.state.Copy() eventCache := types.NewEventCache(cs.evsw) - // Execute and commit the block, and update the mempool. + // Execute and commit the block, update and save the state, and update the mempool. // All calls to the proxyAppConn should come here. // NOTE: the block.AppHash wont reflect these txs until the next block err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool) @@ -1233,14 +1233,10 @@ func (cs *ConsensusState) finalizeCommit(height int) { fail.Fail() // XXX // Fire off event for new block. - // TODO: Handle app failure. See #177 types.FireEventNewBlock(cs.evsw, types.EventDataNewBlock{block}) types.FireEventNewBlockHeader(cs.evsw, types.EventDataNewBlockHeader{block.Header}) eventCache.Flush() - // Save the state. - stateCopy.Save() - fail.Fail() // XXX // NewHeightStep! diff --git a/state/execution.go b/state/execution.go index 25d5dcd3..943a64f7 100644 --- a/state/execution.go +++ b/state/execution.go @@ -13,59 +13,39 @@ import ( "github.com/tendermint/tendermint/types" ) -// ExecBlock executes the block to mutate State. +//-------------------------------------------------- +// Execute the block + +// ExecBlock executes the block, but does NOT mutate State. // + validates the block // + executes block.Txs on the proxyAppConn -// + updates validator sets -// + returns block.Txs results -func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) ([]*types.TxResult, error) { +func (s *State) ExecBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { // Validate the block. if err := s.validateBlock(block); err != nil { return nil, ErrInvalidBlock(err) } - // compute bitarray of validators that signed - signed := commitBitArrayFromBlock(block) - _ = signed // TODO send on begin block - - // copy the valset - valSet := s.Validators.Copy() - nextValSet := valSet.Copy() - // Execute the block txs - txResults, changedValidators, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) + abciResponses, err := execBlockOnProxyApp(eventCache, proxyAppConn, block) if err != nil { // There was some error in proxyApp // TODO Report error and wait for proxyApp to be available. return nil, ErrProxyAppConn(err) } - // update the validator set - err = updateValidators(nextValSet, changedValidators) - if err != nil { - log.Warn("Error changing validator set", "error", err) - // TODO: err or carry on? - } - - // All good! - // Update validator accums and set state variables - nextValSet.IncrementAccum(1) - s.SetBlockAndValidators(block.Header, blockPartsHeader, valSet, nextValSet) - - fail.Fail() // XXX - - return txResults, nil + return abciResponses, nil } // Executes block's transactions on proxyAppConn. // Returns a list of transaction results and updates to the validator set // TODO: Generate a bitmap or otherwise store tx validity in state. -func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) ([]*types.TxResult, []*abci.Validator, error) { +func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { var validTxs, invalidTxs = 0, 0 - txResults := make([]*types.TxResult, len(block.Txs)) txIndex := 0 + abciResponses := NewABCIResponses(block) + // Execute transactions and get hash proxyCb := func(req *abci.Request, res *abci.Response) { switch r := res.Value.(type) { @@ -84,12 +64,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo txError = txResult.Code.String() } - txResults[txIndex] = &types.TxResult{ - Height: uint64(block.Height), - Index: uint32(txIndex), - Tx: req.GetDeliverTx().Tx, - Result: *txResult, - } + abciResponses.TxResults[txIndex] = &types.TxResult{uint64(block.Height), uint32(txIndex), *txResult} txIndex++ // NOTE: if we count we can access the tx from the block instead of @@ -111,7 +86,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo err := proxyAppConn.BeginBlockSync(block.Hash(), types.TM2PB.Header(block.Header)) if err != nil { log.Warn("Error in proxyAppConn.BeginBlock", "error", err) - return nil, nil, err + return nil, err } fail.Fail() // XXX @@ -121,7 +96,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo fail.FailRand(len(block.Txs)) // XXX proxyAppConn.DeliverTxAsync(tx) if err := proxyAppConn.Error(); err != nil { - return nil, nil, err + return nil, err } } @@ -131,7 +106,7 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo respEndBlock, err := proxyAppConn.EndBlockSync(uint64(block.Height)) if err != nil { log.Warn("Error in proxyAppConn.EndBlock", "error", err) - return nil, nil, err + return nil, err } fail.Fail() // XXX @@ -140,7 +115,9 @@ func execBlockOnProxyApp(eventCache types.Fireable, proxyAppConn proxy.AppConnCo if len(respEndBlock.Diffs) > 0 { log.Info("Update to validator set", "updates", abci.ValidatorsString(respEndBlock.Diffs)) } - return txResults, respEndBlock.Diffs, nil + abciResponses.Validators = respEndBlock.Diffs + + return abciResponses, nil } func updateValidators(validators *types.ValidatorSet, changedValidators []*abci.Validator) error { @@ -230,16 +207,30 @@ func (s *State) validateBlock(block *types.Block) error { return nil } -// ApplyBlock executes the block, then commits and updates the mempool -// atomically, optionally indexing transaction results. +//----------------------------------------------------------------------------- +// ApplyBlock executes the block, updates state w/ ABCI responses, +// then commits and updates the mempool atomically, then saves state. +// Transaction results are optionally indexed. + +// Execute and commit block against app, save block and state func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, partsHeader types.PartSetHeader, mempool types.Mempool) error { - txResults, err := s.ExecBlock(eventCache, proxyAppConn, block, partsHeader) + abciResponses, err := s.ExecBlock(eventCache, proxyAppConn, block) if err != nil { return fmt.Errorf("Exec failed for application: %v", err) } + fail.Fail() // XXX + + // save the results before we commit + s.SaveABCIResponses(abciResponses) + + fail.Fail() // XXX + + // now update the block and validators + s.SetBlockAndValidators(block.Header, partsHeader) + // lock mempool, commit state, update mempoool err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool) if err != nil { @@ -252,6 +243,11 @@ func (s *State) ApplyBlock(eventCache types.Fireable, proxyAppConn proxy.AppConn } s.TxIndexer.AddBatch(batch) + fail.Fail() // XXX + + // save the state + s.Save() + return nil } @@ -284,9 +280,10 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl // Apply and commit a block, but without all the state validation. // Returns the application root hash (result of abci.Commit) +// TODO handle abciResponses func ApplyBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block) ([]byte, error) { var eventCache types.Fireable // nil - _, _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) + _, err := execBlockOnProxyApp(eventCache, appConnConsensus, block) if err != nil { log.Warn("Error executing block on proxy app", "height", block.Height, "err", err) return nil, err diff --git a/state/state.go b/state/state.go index 14b324d5..1b90d340 100644 --- a/state/state.go +++ b/state/state.go @@ -6,6 +6,7 @@ import ( "sync" "time" + abci "github.com/tendermint/abci/types" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" @@ -16,7 +17,8 @@ import ( ) var ( - stateKey = []byte("stateKey") + stateKey = []byte("stateKey") + abciResponsesKey = []byte("abciResponsesKey") ) //----------------------------------------------------------------------------- @@ -31,7 +33,7 @@ type State struct { GenesisDoc *types.GenesisDoc ChainID string - // updated at end of ExecBlock + // updated at end of SetBlockAndValidators LastBlockHeight int // Genesis state has this set to 0. So, Block(H=0) does not exist. LastBlockID types.BlockID LastBlockTime time.Time @@ -42,6 +44,10 @@ type State struct { AppHash []byte TxIndexer txindex.TxIndexer `json:"-"` // Transaction indexer. + + // Intermediate results from processing + // Persisted separately from the state + abciResponses *ABCIResponses } func LoadState(db dbm.DB) *State { @@ -62,6 +68,8 @@ func loadState(db dbm.DB, key []byte) *State { } // TODO: ensure that buf is completely read. } + + s.LoadABCIResponses() return s } @@ -76,7 +84,8 @@ func (s *State) Copy() *State { Validators: s.Validators.Copy(), LastValidators: s.LastValidators.Copy(), AppHash: s.AppHash, - TxIndexer: s.TxIndexer, // pointer here, not value + abciResponses: s.abciResponses, // pointer here, not value + TxIndexer: s.TxIndexer, // pointer here, not value } } @@ -86,6 +95,37 @@ func (s *State) Save() { s.db.SetSync(stateKey, s.Bytes()) } +// Sets the ABCIResponses in the state and writes them to disk +func (s *State) SaveABCIResponses(abciResponses *ABCIResponses) { + s.abciResponses = abciResponses + + // save the validators to the db + s.db.SetSync(abciResponsesKey, s.abciResponses.Bytes()) + + // save the tx results using the TxIndexer + batch := txindexer.NewBatch() + for i, r := range s.abciResponses.TxResults { + tx := s.abciResponses.Txs[i] + batch.Index(tx.Hash(), *r) + } + s.TxIndexer.Batch(batch) +} + +func (s *State) LoadABCIResponses() { + s.abciResponses = new(ABCIResponses) + + buf := s.db.Get(abciResponsesKey) + if len(buf) != 0 { + r, n, err := bytes.NewReader(buf), new(int), new(error) + wire.ReadBinaryPtr(&s.abciResponses.Validators, r, 0, n, err) + if *err != nil { + // DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED + Exit(Fmt("Data has been corrupted or its spec has changed: %v\n", *err)) + } + // TODO: ensure that buf is completely read. + } +} + func (s *State) Equals(s2 *State) bool { return bytes.Equal(s.Bytes(), s2.Bytes()) } @@ -101,7 +141,21 @@ func (s *State) Bytes() []byte { // Mutate state variables to match block and validators // after running EndBlock -func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, prevValSet, nextValSet *types.ValidatorSet) { +func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader) { + + // copy the valset + prevValSet := s.Validators.Copy() + nextValSet := prevValSet.Copy() + + // update the validator set + err := updateValidators(nextValSet, s.abciResponses.Validators) + if err != nil { + log.Warn("Error changing validator set", "error", err) + // TODO: err or carry on? + } + // Update validator accums and set state variables + nextValSet.IncrementAccum(1) + s.setBlockAndValidators(header.Height, types.BlockID{header.Hash(), blockPartsHeader}, header.Time, prevValSet, nextValSet) @@ -134,6 +188,33 @@ func GetState(config cfg.Config, stateDB dbm.DB) *State { return state } +//-------------------------------------------------- +// ABCIResponses holds intermediate state during block processing + +type ABCIResponses struct { + Validators []*abci.Validator // changes to the validator set + + Txs types.Txs // for reference later + TxResults []*types.TxResult // results of the txs, populated in the proxyCb +} + +func NewABCIResponses(block *types.Block) *ABCIResponses { + return &ABCIResponses{ + Txs: block.Data.Txs, + TxResults: make([]*types.TxResult, block.NumTxs), + } +} + +// Serialize the list of validators +func (a *ABCIResponses) Bytes() []byte { + buf, n, err := new(bytes.Buffer), new(int), new(error) + wire.WriteBinary(a.Validators, buf, n, err) + if *err != nil { + PanicCrisis(*err) + } + return buf.Bytes() +} + //----------------------------------------------------------------------------- // Genesis