mirror of
https://github.com/fluencelabs/tendermint
synced 2025-07-31 04:01:55 +00:00
make ReplayBlocks logic exhaustive
This commit is contained in:
@@ -15,7 +15,7 @@ import (
|
||||
"github.com/tendermint/tendermint/config/tendermint_test"
|
||||
|
||||
"github.com/tendermint/abci/example/dummy"
|
||||
. "github.com/tendermint/go-common"
|
||||
cmn "github.com/tendermint/go-common"
|
||||
cfg "github.com/tendermint/go-config"
|
||||
"github.com/tendermint/go-crypto"
|
||||
dbm "github.com/tendermint/go-db"
|
||||
@@ -37,7 +37,7 @@ func init() {
|
||||
// the `Handshake Tests` are for failures in applying the block.
|
||||
// With the help of the WAL, we can recover from it all!
|
||||
|
||||
var data_dir = path.Join(GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
|
||||
var data_dir = path.Join(cmn.GoPath, "src/github.com/tendermint/tendermint/consensus", "test_data")
|
||||
|
||||
//------------------------------------------------------------------------------------------
|
||||
// WAL Tests
|
||||
@@ -68,7 +68,7 @@ type testCase struct {
|
||||
|
||||
func newTestCase(name string, stepChanges []int) *testCase {
|
||||
if len(stepChanges) != 3 {
|
||||
panic(Fmt("a full wal has 3 step changes! Got array %v", stepChanges))
|
||||
panic(cmn.Fmt("a full wal has 3 step changes! Got array %v", stepChanges))
|
||||
}
|
||||
return &testCase{
|
||||
name: name,
|
||||
@@ -103,15 +103,15 @@ func readWAL(p string) string {
|
||||
|
||||
func writeWAL(walMsgs string) string {
|
||||
tempDir := os.TempDir()
|
||||
walDir := path.Join(tempDir, "/wal"+RandStr(12))
|
||||
walDir := path.Join(tempDir, "/wal"+cmn.RandStr(12))
|
||||
walFile := path.Join(walDir, "wal")
|
||||
// Create WAL directory
|
||||
err := EnsureDir(walDir, 0700)
|
||||
err := cmn.EnsureDir(walDir, 0700)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
// Write the needed WAL to file
|
||||
err = WriteFile(walFile, []byte(walMsgs), 0600)
|
||||
err = cmn.WriteFile(walFile, []byte(walMsgs), 0600)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -123,7 +123,7 @@ func waitForBlock(newBlockCh chan interface{}, thisCase *testCase, i int) {
|
||||
select {
|
||||
case <-newBlockCh:
|
||||
case <-after:
|
||||
panic(Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i))
|
||||
panic(cmn.Fmt("Timed out waiting for new block for case '%s' line %d", thisCase.name, i))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -156,7 +156,7 @@ func toPV(pv PrivValidator) *types.PrivValidator {
|
||||
|
||||
func setupReplayTest(thisCase *testCase, nLines int, crashAfter bool) (*ConsensusState, chan interface{}, string, string) {
|
||||
fmt.Println("-------------------------------------")
|
||||
log.Notice(Fmt("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter))
|
||||
log.Notice(cmn.Fmt("Starting replay test %v (of %d lines of WAL). Crash after = %v", thisCase.name, nLines, crashAfter))
|
||||
|
||||
lineStep := nLines
|
||||
if crashAfter {
|
||||
@@ -285,8 +285,15 @@ func TestHandshakeReplayNone(t *testing.T) {
|
||||
// Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks
|
||||
func testHandshakeReplay(t *testing.T, nBlocks int) {
|
||||
config := tendermint_test.ResetConfig("proxy_test_")
|
||||
walFile := path.Join(data_dir, "many_blocks.cswal")
|
||||
|
||||
// copy the many_blocks file
|
||||
walBody, err := cmn.ReadFile(path.Join(data_dir, "many_blocks.cswal"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
walFile := writeWAL(string(walBody))
|
||||
config.Set("cs_wal_file", walFile)
|
||||
|
||||
privVal := types.LoadPrivValidator(config.GetString("priv_validator_file"))
|
||||
testPartSize = config.GetInt("block_part_size")
|
||||
|
||||
@@ -371,7 +378,7 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !found {
|
||||
return nil, nil, errors.New(Fmt("WAL does not contain height %d.", 1))
|
||||
return nil, nil, errors.New(cmn.Fmt("WAL does not contain height %d.", 1))
|
||||
}
|
||||
defer gr.Close()
|
||||
|
||||
|
@@ -253,7 +253,8 @@ type ConsensusState struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore sm.BlockStore) {
|
||||
// Replay the last block through the consensus and return the AppHash after commit.
|
||||
func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore sm.BlockStore) ([]byte, error) {
|
||||
mempool := sm.MockMempool{}
|
||||
cs := NewConsensusState(config, state, proxyApp, blockStore, mempool)
|
||||
|
||||
@@ -265,8 +266,10 @@ func ReplayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnC
|
||||
|
||||
// run through the WAL, commit new block, stop
|
||||
cs.Start()
|
||||
<-newBlockCh
|
||||
<-newBlockCh // TODO: use a timeout and return err?
|
||||
cs.Stop()
|
||||
|
||||
return cs.state.AppHash, nil
|
||||
}
|
||||
|
||||
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore sm.BlockStore, mempool sm.Mempool) *ConsensusState {
|
||||
@@ -1235,7 +1238,7 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
||||
cs.blockStore.SaveBlock(block, blockParts, seenCommit)
|
||||
} else {
|
||||
// Happens during replay if we already saved the block but didn't commit
|
||||
log.Notice("Calling finalizeCommit on already stored block", "height", block.Height)
|
||||
log.Info("Calling finalizeCommit on already stored block", "height", block.Height)
|
||||
}
|
||||
|
||||
fail.Fail() // XXX
|
||||
@@ -1250,7 +1253,7 @@ func (cs *ConsensusState) finalizeCommit(height int) {
|
||||
// NOTE: the block.AppHash wont reflect these txs until the next block
|
||||
err := stateCopy.ApplyBlock(eventCache, cs.proxyAppConn, block, blockParts.Header(), cs.mempool)
|
||||
if err != nil {
|
||||
log.Warn("Error on ApplyBlock. Did the application crash? Please restart tendermint", "error", err)
|
||||
log.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@@ -260,6 +260,7 @@ func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, bl
|
||||
log.Debug("Commit.Log: " + res.Log)
|
||||
}
|
||||
|
||||
log.Info("Committed state", "hash", res.Data)
|
||||
// Set the state's new AppHash
|
||||
s.AppHash = res.Data
|
||||
|
||||
@@ -327,7 +328,8 @@ type BlockStore interface {
|
||||
LoadSeenCommit(height int) *types.Commit
|
||||
}
|
||||
|
||||
type blockReplayFunc func(cfg.Config, *State, proxy.AppConnConsensus, BlockStore)
|
||||
// returns the apphash from Commit
|
||||
type blockReplayFunc func(cfg.Config, *State, proxy.AppConnConsensus, BlockStore) ([]byte, error)
|
||||
|
||||
type Handshaker struct {
|
||||
config cfg.Config
|
||||
@@ -362,73 +364,120 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
||||
// TODO: check version
|
||||
|
||||
// replay blocks up to the latest in the blockstore
|
||||
err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
|
||||
appHash, err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
|
||||
if err != nil {
|
||||
return errors.New(Fmt("Error on replay: %v", err))
|
||||
}
|
||||
|
||||
// NOTE: the h.state may now be behind the cs state
|
||||
|
||||
// TODO: (on restart) replay mempool
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Replay all blocks after blockHeight and ensure the result matches the current state.
|
||||
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) error {
|
||||
// Returns the final AppHash or an error
|
||||
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp proxy.AppConns) ([]byte, error) {
|
||||
|
||||
storeBlockHeight := h.store.Height()
|
||||
stateBlockHeight := h.state.LastBlockHeight
|
||||
log.Notice("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight)
|
||||
|
||||
// First handle edge cases and constraints on the storeBlockHeight
|
||||
if storeBlockHeight == 0 {
|
||||
return nil
|
||||
return nil, nil
|
||||
|
||||
} else if storeBlockHeight < appBlockHeight {
|
||||
// if the app is ahead, there's nothing we can do
|
||||
return ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
|
||||
// the app should never be ahead of the store (but this is under app's control)
|
||||
return nil, ErrAppBlockHeightTooHigh{storeBlockHeight, appBlockHeight}
|
||||
|
||||
} else if storeBlockHeight == appBlockHeight && storeBlockHeight == stateBlockHeight {
|
||||
// all good!
|
||||
return nil
|
||||
} else if storeBlockHeight < stateBlockHeight {
|
||||
// the state should never be ahead of the store (this is under tendermint's control)
|
||||
PanicSanity(Fmt("StateBlockHeight (%d) > StoreBlockHeight (%d)", stateBlockHeight, storeBlockHeight))
|
||||
|
||||
} else if storeBlockHeight == appBlockHeight && storeBlockHeight == stateBlockHeight+1 {
|
||||
// We already ran Commit, but didn't save the state, so run through consensus with mock app
|
||||
mockApp := newMockProxyApp(appHash)
|
||||
log.Info("Replay last block using mock app")
|
||||
h.replayLastBlock(h.config, h.state, mockApp, h.store)
|
||||
|
||||
} else if storeBlockHeight == appBlockHeight+1 && storeBlockHeight == stateBlockHeight+1 {
|
||||
// We crashed after saving the block
|
||||
// but before Commit (both the state and app are behind),
|
||||
// so run through consensus with the real app
|
||||
log.Info("Replay last block using real app")
|
||||
h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store)
|
||||
|
||||
} else {
|
||||
// store is more than one ahead,
|
||||
// so app wants to replay many blocks.
|
||||
// replay all blocks from appBlockHeight+1 to storeBlockHeight-1.
|
||||
// Replay the final block through consensus
|
||||
|
||||
var appHash []byte
|
||||
var err error
|
||||
for i := appBlockHeight + 1; i <= storeBlockHeight; i++ {
|
||||
log.Info("Applying block", "height", i)
|
||||
h.nBlocks += 1
|
||||
block := h.store.LoadBlock(i)
|
||||
appHash, err = applyBlock(proxyApp.Consensus(), block)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: should we be playing the final block through the consensus, instead of using applyBlock?
|
||||
// h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store)
|
||||
|
||||
if !bytes.Equal(h.state.AppHash, appHash) {
|
||||
return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash))
|
||||
}
|
||||
return nil
|
||||
} else if storeBlockHeight > stateBlockHeight+1 {
|
||||
// store should be at most one ahead of the state (this is under tendermint's control)
|
||||
PanicSanity(Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
|
||||
}
|
||||
return nil
|
||||
|
||||
// Now either store is equal to state, or one ahead.
|
||||
// For each, consider all cases of where the app could be, given app <= store
|
||||
if storeBlockHeight == stateBlockHeight {
|
||||
// Tendermint ran Commit and saved the state.
|
||||
// Either the app is asking for replay, or we're all synced up.
|
||||
if appBlockHeight < storeBlockHeight {
|
||||
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
|
||||
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false)
|
||||
|
||||
} else if appBlockHeight == storeBlockHeight {
|
||||
// we're good!
|
||||
return appHash, nil
|
||||
}
|
||||
|
||||
} else if storeBlockHeight == stateBlockHeight+1 {
|
||||
// We saved the block in the store but haven't updated the state,
|
||||
// so we'll need to replay a block using the WAL.
|
||||
if appBlockHeight < stateBlockHeight {
|
||||
// the app is further behind than it should be, so replay blocks
|
||||
// but leave the last block to go through the WAL
|
||||
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true)
|
||||
|
||||
} else if appBlockHeight == stateBlockHeight {
|
||||
// We haven't run Commit (both the state and app are one block behind),
|
||||
// so run through consensus with the real app
|
||||
log.Info("Replay last block using real app")
|
||||
return h.replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store)
|
||||
|
||||
} else if appBlockHeight == storeBlockHeight {
|
||||
// We ran Commit, but didn't save the state, so run through consensus with mock app
|
||||
mockApp := newMockProxyApp(appHash)
|
||||
log.Info("Replay last block using mock app")
|
||||
return h.replayLastBlock(h.config, h.state, mockApp, h.store)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
PanicSanity("Should never happen")
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) {
|
||||
// App is further behind than it should be, so we need to replay blocks.
|
||||
// We replay all blocks from appBlockHeight+1 to storeBlockHeight-1,
|
||||
// and let the final block be replayed through ReplayBlocks.
|
||||
// Note that we don't have an old version of the state,
|
||||
// so we by-pass state validation using applyBlock here.
|
||||
|
||||
var appHash []byte
|
||||
var err error
|
||||
finalBlock := storeBlockHeight
|
||||
if useReplayFunc {
|
||||
finalBlock -= 1
|
||||
}
|
||||
for i := appBlockHeight + 1; i <= finalBlock; i++ {
|
||||
log.Info("Applying block", "height", i)
|
||||
h.nBlocks += 1
|
||||
block := h.store.LoadBlock(i)
|
||||
appHash, err = applyBlock(proxyApp.Consensus(), block)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if useReplayFunc {
|
||||
// sync the final block
|
||||
appHash, err = h.ReplayBlocks(appHash, finalBlock, proxyApp)
|
||||
if err != nil {
|
||||
return appHash, err
|
||||
}
|
||||
}
|
||||
|
||||
if !bytes.Equal(h.state.AppHash, appHash) {
|
||||
return nil, errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash))
|
||||
}
|
||||
|
||||
return appHash, nil
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------------
|
||||
|
Reference in New Issue
Block a user