diff --git a/consensus/replay.go b/consensus/replay.go index 07308bf1..f117e66d 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -174,25 +174,6 @@ func makeHeightSearchFunc(height int) auto.SearchFunc { // by handshaking with the app to figure out where // we were last and using the WAL to recover there -// Replay the last block through the consensus and return the AppHash from after Commit. -func replayLastBlock(config cfg.Config, state *sm.State, proxyApp proxy.AppConnConsensus, blockStore types.BlockStore) ([]byte, error) { - mempool := types.MockMempool{} - cs := NewConsensusState(config, state, proxyApp, blockStore, mempool) - - evsw := types.NewEventSwitch() - evsw.Start() - defer evsw.Stop() - cs.SetEventSwitch(evsw) - newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) - - // run through the WAL, commit new block, stop - cs.Start() - <-newBlockCh // TODO: use a timeout and return err? - cs.Stop() - - return cs.state.AppHash, nil -} - type Handshaker struct { config cfg.Config state *sm.State @@ -286,13 +267,13 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p // We haven't run Commit (both the state and app are one block behind), // so run through consensus with the real app log.Info("Replay last block using real app") - return replayLastBlock(h.config, h.state, proxyApp.Consensus(), h.store) + return h.replayLastBlock(proxyApp.Consensus()) } else if appBlockHeight == storeBlockHeight { // We ran Commit, but didn't save the state, so run through consensus with mock app mockApp := newMockProxyApp(appHash) log.Info("Replay last block using mock app") - return replayLastBlock(h.config, h.state, mockApp, h.store) + return h.replayLastBlock(mockApp) } } @@ -316,28 +297,48 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store } for i := appBlockHeight + 1; i <= finalBlock; i++ { log.Info("Applying block", "height", i) - h.nBlocks += 1 block := h.store.LoadBlock(i) appHash, err = sm.ApplyBlock(proxyApp.Consensus(), block) if err != nil { return nil, err } + + h.nBlocks += 1 } if useReplayFunc { // sync the final block - appHash, err = h.ReplayBlocks(appHash, finalBlock, proxyApp) - if err != nil { - return appHash, err - } + return h.ReplayBlocks(appHash, finalBlock, proxyApp) } return appHash, h.checkAppHash(appHash) } +// Replay the last block through the consensus and return the AppHash from after Commit. +func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, error) { + mempool := types.MockMempool{} + cs := NewConsensusState(h.config, h.state, proxyApp, h.store, mempool) + + evsw := types.NewEventSwitch() + evsw.Start() + defer evsw.Stop() + cs.SetEventSwitch(evsw) + newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) + + // run through the WAL, commit new block, stop + cs.Start() + <-newBlockCh // TODO: use a timeout and return err? + cs.Stop() + + h.nBlocks += 1 + + return cs.state.AppHash, nil +} + func (h *Handshaker) checkAppHash(appHash []byte) error { if !bytes.Equal(h.state.AppHash, appHash) { - return errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)) + panic(errors.New(Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error()) + return nil } return nil } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 2d812de9..c70b60fa 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -262,28 +262,41 @@ var ( //--------------------------------------- // Test handshake/replay +// 0 - all synced up +// 1 - saved block but app and state are behind +// 2 - save block and committed but state is behind +var modes = []uint{0, 1, 2} + // Sync from scratch func TestHandshakeReplayAll(t *testing.T) { - testHandshakeReplay(t, 0) + for _, m := range modes { + testHandshakeReplay(t, 0, m) + } } // Sync many, not from scratch func TestHandshakeReplaySome(t *testing.T) { - testHandshakeReplay(t, 1) + for _, m := range modes { + testHandshakeReplay(t, 1, m) + } } // Sync from lagging by one func TestHandshakeReplayOne(t *testing.T) { - testHandshakeReplay(t, NUM_BLOCKS-1) + for _, m := range modes { + testHandshakeReplay(t, NUM_BLOCKS-1, m) + } } // Sync from caught up func TestHandshakeReplayNone(t *testing.T) { - testHandshakeReplay(t, NUM_BLOCKS) + for _, m := range modes { + testHandshakeReplay(t, NUM_BLOCKS, m) + } } // Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks -func testHandshakeReplay(t *testing.T, nBlocks int) { +func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { config := tendermint_test.ResetConfig("proxy_test_") // copy the many_blocks file @@ -310,44 +323,23 @@ func testHandshakeReplay(t *testing.T, nBlocks int) { store.chain = chain store.commits = commits - // run the whole chain against this client to build up the tendermint state - clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1"))) - proxyApp := proxy.NewAppConns(config, clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) - if _, err := proxyApp.Start(); err != nil { - t.Fatalf("Error starting proxy app connections: %v", err) - } - for _, block := range chain { - err := state.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool) - if err != nil { - t.Fatal(err) - } - } - proxyApp.Stop() - latestAppHash := state.AppHash + // run the chain through state.ApplyBlock to build up the tendermint state + latestAppHash := buildTMStateFromChain(config, state, chain, mode) - // run nBlocks against a new client to build up the app state. - // use a throwaway tendermint state - clientCreator2 := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2"))) + // make a new client creator + dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "2")) + clientCreator2 := proxy.NewLocalClientCreator(dummyApp) if nBlocks > 0 { - // start a new app without handshake, play nBlocks blocks + // run nBlocks against a new client to build up the app state. + // use a throwaway tendermint state proxyApp := proxy.NewAppConns(config, clientCreator2, nil) - if _, err := proxyApp.Start(); err != nil { - t.Fatalf("Error starting proxy app connections: %v", err) - } - state2, _ := stateAndStore(config, privVal.PubKey) - for i := 0; i < nBlocks; i++ { - block := chain[i] - err := state2.ApplyBlock(nil, proxyApp.Consensus(), block, block.MakePartSet(testPartSize).Header(), mempool) - if err != nil { - t.Fatal(err) - } - } - proxyApp.Stop() + state, _ := stateAndStore(config, privVal.PubKey) + buildAppStateFromChain(proxyApp, state, chain, nBlocks, mode) } // now start the app using the handshake - it should sync handshaker := NewHandshaker(config, state, store) - proxyApp = proxy.NewAppConns(config, clientCreator2, handshaker) + proxyApp := proxy.NewAppConns(config, clientCreator2, handshaker) if _, err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) } @@ -363,9 +355,87 @@ func testHandshakeReplay(t *testing.T, nBlocks int) { t.Fatalf("Expected app hashes to match after handshake/replay. got %X, expected %X", res.LastBlockAppHash, latestAppHash) } - if handshaker.NBlocks() != NUM_BLOCKS-nBlocks { - t.Fatalf("Expected handshake to sync %d blocks, got %d", NUM_BLOCKS-nBlocks, handshaker.NBlocks()) + expectedBlocksToSync := NUM_BLOCKS - nBlocks + if nBlocks == NUM_BLOCKS && mode > 0 { + expectedBlocksToSync += 1 + } else if nBlocks > 0 && mode == 1 { + expectedBlocksToSync += 1 } + + if handshaker.NBlocks() != expectedBlocksToSync { + t.Fatalf("Expected handshake to sync %d blocks, got %d", expectedBlocksToSync, handshaker.NBlocks()) + } +} + +func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) { + err := st.ApplyBlock(nil, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool) + if err != nil { + panic(err) + } +} + +func buildAppStateFromChain(proxyApp proxy.AppConns, + state *sm.State, chain []*types.Block, nBlocks int, mode uint) { + // start a new app without handshake, play nBlocks blocks + if _, err := proxyApp.Start(); err != nil { + panic(err) + } + defer proxyApp.Stop() + switch mode { + case 0: + for i := 0; i < nBlocks; i++ { + block := chain[i] + applyBlock(state, block, proxyApp) + } + case 1, 2: + for i := 0; i < nBlocks-1; i++ { + block := chain[i] + applyBlock(state, block, proxyApp) + } + + if mode == 2 { + // update the dummy height and apphash + // as if we ran commit but not + applyBlock(state, chain[nBlocks-1], proxyApp) + } + } + +} + +func buildTMStateFromChain(config cfg.Config, state *sm.State, chain []*types.Block, mode uint) []byte { + // run the whole chain against this client to build up the tendermint state + clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.GetString("db_dir"), "1"))) + proxyApp := proxy.NewAppConns(config, clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) + if _, err := proxyApp.Start(); err != nil { + panic(err) + } + defer proxyApp.Stop() + + var latestAppHash []byte + + switch mode { + case 0: + // sync right up + for _, block := range chain { + applyBlock(state, block, proxyApp) + } + + latestAppHash = state.AppHash + case 1, 2: + // sync up to the penultimate as if we stored the block. + // whether we commit or not depends on the appHash + for _, block := range chain[:len(chain)-1] { + applyBlock(state, block, proxyApp) + } + + // apply the final block to a state copy so we can + // get the right next appHash but keep the state back + stateCopy := state.Copy() + applyBlock(stateCopy, chain[len(chain)-1], proxyApp) + latestAppHash = stateCopy.AppHash + } + + return latestAppHash } //--------------------------