diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 0f1ee28f..f777c6e0 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -28,4 +28,5 @@ - [p2p] \#3532 limit the number of attempts to connect to a peer in seed mode to 16 (as a result, the node will stop retrying after a 35 hours time window) - [consensus] \#2723, \#3451 and \#3317 Fix non-deterministic tests -- [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie) \ No newline at end of file +- [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state +- [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie) diff --git a/blockchain/reactor_test.go b/blockchain/reactor_test.go index 1b3aee56..9cd3489c 100644 --- a/blockchain/reactor_test.go +++ b/blockchain/reactor_test.go @@ -91,8 +91,10 @@ func newBlockchainReactor(logger log.Logger, genDoc *types.GenesisDoc, privVals // NOTE we have to create and commit the blocks first because // pool.height is determined from the store. fastSync := true - blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), proxyApp.Consensus(), + db := dbm.NewMemDB() + blockExec := sm.NewBlockExecutor(db, log.TestingLogger(), proxyApp.Consensus(), sm.MockMempool{}, sm.MockEvidencePool{}) + sm.SaveState(db, state) // let's add some blocks in for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ { diff --git a/consensus/byzantine_test.go b/consensus/byzantine_test.go index 0c1b8856..d181fb1a 100644 --- a/consensus/byzantine_test.go +++ b/consensus/byzantine_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" cmn "github.com/tendermint/tendermint/libs/common" "github.com/tendermint/tendermint/p2p" + sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -69,7 +70,7 @@ func TestByzantine(t *testing.T) { blocksSubs[i], err = eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) - conR := NewConsensusReactor(css[i], true) // so we dont start the consensus states + conR := NewConsensusReactor(css[i], true) // so we don't start the consensus states conR.SetLogger(logger.With("validator", i)) conR.SetEventBus(eventBus) @@ -81,6 +82,7 @@ func TestByzantine(t *testing.T) { } reactors[i] = conRI + sm.SaveState(css[i].blockExec.DB(), css[i].state) //for save height 1's validators info } defer func() { diff --git a/consensus/common_test.go b/consensus/common_test.go index 8f305139..a0c179ae 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -14,6 +14,8 @@ import ( "github.com/go-kit/kit/log/term" + "path" + abcicli "github.com/tendermint/tendermint/abci/client" "github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/kvstore" @@ -119,6 +121,24 @@ func incrementRound(vss ...*validatorStub) { } } +type ValidatorStubsByAddress []*validatorStub + +func (vss ValidatorStubsByAddress) Len() int { + return len(vss) +} + +func (vss ValidatorStubsByAddress) Less(i, j int) bool { + return bytes.Compare(vss[i].GetPubKey().Address(), vss[j].GetPubKey().Address()) == -1 +} + +func (vss ValidatorStubsByAddress) Swap(i, j int) { + it := vss[i] + vss[i] = vss[j] + vss[i].Index = i + vss[j] = it + vss[j].Index = j +} + //------------------------------------------------------------------------------- // Functions for transitioning the consensus state @@ -228,7 +248,7 @@ func validatePrevoteAndPrecommit(t *testing.T, cs *ConsensusState, thisRound, lo } func subscribeToVoter(cs *ConsensusState, addr []byte) <-chan tmpubsub.Message { - votesSub, err := cs.eventBus.Subscribe(context.Background(), testSubscriber, types.EventQueryVote) + votesSub, err := cs.eventBus.SubscribeUnbuffered(context.Background(), testSubscriber, types.EventQueryVote) if err != nil { panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, types.EventQueryVote)) } @@ -278,7 +298,8 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S evpool := sm.MockEvidencePool{} // Make ConsensusState - stateDB := dbm.NewMemDB() + stateDB := blockDB + sm.SaveState(stateDB, state) //for save height 1's validators info blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool) cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs.SetLogger(log.TestingLogger().With("module", "consensus")) @@ -564,7 +585,7 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou vals := types.TM2PB.ValidatorUpdates(state.Validators) app.InitChain(abci.RequestInitChain{Validators: vals}) - css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], app) + css[i] = newConsensusStateWithConfigAndBlockStore(thisConfig, state, privVals[i], app, stateDB) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } @@ -576,12 +597,11 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou } // nPeers = nValidators + nNotValidator -func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerFunc func() TimeoutTicker, - appFunc func() abci.Application) ([]*ConsensusState, cleanupFunc) { - +func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerFunc func() TimeoutTicker, appFunc func(string) abci.Application) ([]*ConsensusState, *types.GenesisDoc, *cfg.Config, cleanupFunc) { genDoc, privVals := randGenesisDoc(nValidators, false, testMinPower) css := make([]*ConsensusState, nPeers) logger := consensusLogger() + var peer0Config *cfg.Config configRootDirs := make([]string, 0, nPeers) for i := 0; i < nPeers; i++ { stateDB := dbm.NewMemDB() // each state needs its own db @@ -589,6 +609,9 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF thisConfig := ResetConfig(fmt.Sprintf("%s_%d", testName, i)) configRootDirs = append(configRootDirs, thisConfig.RootDir) ensureDir(filepath.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal + if i == 0 { + peer0Config = thisConfig + } var privVal types.PrivValidator if i < nValidators { privVal = privVals[i] @@ -605,15 +628,19 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF privVal = privval.GenFilePV(tempKeyFile.Name(), tempStateFile.Name()) } - app := appFunc() + app := appFunc(path.Join(config.DBDir(), fmt.Sprintf("%s_%d", testName, i))) vals := types.TM2PB.ValidatorUpdates(state.Validators) + if _, ok := app.(*kvstore.PersistentKVStoreApplication); ok { + state.Version.Consensus.App = kvstore.ProtocolVersion //simulate handshake, receive app version. If don't do this, replay test will fail + } app.InitChain(abci.RequestInitChain{Validators: vals}) + //sm.SaveState(stateDB,state) //height 1's validatorsInfo already saved in LoadStateFromDBOrGenesisDoc above css[i] = newConsensusStateWithConfig(thisConfig, state, privVal, app) css[i].SetTimeoutTicker(tickerFunc()) css[i].SetLogger(logger.With("validator", i, "module", "consensus")) } - return css, func() { + return css, genDoc, peer0Config, func() { for _, dir := range configRootDirs { os.RemoveAll(dir) } @@ -719,3 +746,7 @@ func newPersistentKVStore() abci.Application { } return kvstore.NewPersistentKVStoreApplication(dir) } + +func newPersistentKVStoreWithPath(dbDir string) abci.Application { + return kvstore.NewPersistentKVStoreApplication(dbDir) +} diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index e7669b17..1baa92b8 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -11,6 +11,7 @@ import ( "github.com/tendermint/tendermint/abci/example/code" abci "github.com/tendermint/tendermint/abci/types" + dbm "github.com/tendermint/tendermint/libs/db" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" ) @@ -106,7 +107,9 @@ func deliverTxsRange(cs *ConsensusState, start, end int) { func TestMempoolTxConcurrentWithCommit(t *testing.T) { state, privVals := randGenesisState(1, false, 10) - cs := newConsensusState(state, privVals[0], NewCounterApplication()) + blockDB := dbm.NewMemDB() + cs := newConsensusStateWithConfigAndBlockStore(config, state, privVals[0], NewCounterApplication(), blockDB) + sm.SaveState(blockDB, state) height, round := cs.Height, cs.Round newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock) @@ -129,7 +132,9 @@ func TestMempoolTxConcurrentWithCommit(t *testing.T) { func TestMempoolRmBadTx(t *testing.T) { state, privVals := randGenesisState(1, false, 10) app := NewCounterApplication() - cs := newConsensusState(state, privVals[0], app) + blockDB := dbm.NewMemDB() + cs := newConsensusStateWithConfigAndBlockStore(config, state, privVals[0], app, blockDB) + sm.SaveState(blockDB, state) // increment the counter by 1 txBytes := make([]byte, 8) diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index dfd6f250..bf58a8ee 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -51,6 +51,10 @@ func startConsensusNet(t *testing.T, css []*ConsensusState, N int) ( blocksSub, err := eventBuses[i].Subscribe(context.Background(), testSubscriber, types.EventQueryNewBlock) require.NoError(t, err) blocksSubs = append(blocksSubs, blocksSub) + + if css[i].state.LastBlockHeight == 0 { //simulate handle initChain in handshake + sm.SaveState(css[i].blockExec.DB(), css[i].state) + } } // make connected switches and start all reactors p2p.MakeConnectedSwitches(config.P2P, N, func(i int, s *p2p.Switch) *p2p.Switch { @@ -329,7 +333,8 @@ func TestReactorVotingPowerChange(t *testing.T) { func TestReactorValidatorSetChanges(t *testing.T) { nPeers := 7 nVals := 4 - css, cleanup := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentKVStore) + css, _, _, cleanup := randConsensusNetWithPeers(nVals, nPeers, "consensus_val_set_changes_test", newMockTickerFunc(true), newPersistentKVStoreWithPath) + defer cleanup() logger := log.TestingLogger() diff --git a/consensus/replay.go b/consensus/replay.go index 38ed79fc..cc1344eb 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -258,8 +258,10 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { ) // Set AppVersion on the state. - h.initialState.Version.Consensus.App = version.Protocol(res.AppVersion) - sm.SaveState(h.stateDB, h.initialState) + if h.initialState.Version.Consensus.App != version.Protocol(res.AppVersion) { + h.initialState.Version.Consensus.App = version.Protocol(res.AppVersion) + sm.SaveState(h.stateDB, h.initialState) + } // Replay blocks up to the latest in the blockstore. _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp) @@ -421,13 +423,12 @@ func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBl for i := appBlockHeight + 1; i <= finalBlock; i++ { h.logger.Info("Applying block", "height", i) block := h.store.LoadBlock(i) - // Extra check to ensure the app was not changed in a way it shouldn't have. if len(appHash) > 0 { assertAppHashEqualsOneFromBlock(appHash, block) } - appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, state.LastValidators, h.stateDB) + appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.stateDB) if err != nil { return nil, err } @@ -517,6 +518,9 @@ type mockProxyApp struct { func (mock *mockProxyApp) DeliverTx(tx []byte) abci.ResponseDeliverTx { r := mock.abciResponses.DeliverTx[mock.txCount] mock.txCount++ + if r == nil { //it could be nil because of amino unMarshall, it will cause an empty ResponseDeliverTx to become nil + return abci.ResponseDeliverTx{} + } return *r } diff --git a/consensus/replay_test.go b/consensus/replay_test.go index b084cef4..6ed36190 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -15,6 +15,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "sort" + "github.com/tendermint/tendermint/abci/example/kvstore" abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" @@ -140,10 +142,10 @@ LOOP: // create consensus state from a clean slate logger := log.NewNopLogger() - stateDB := dbm.NewMemDB() + blockDB := dbm.NewMemDB() + stateDB := blockDB state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile()) privValidator := loadPrivValidator(consensusReplayConfig) - blockDB := dbm.NewMemDB() cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, kvstore.NewKVStoreApplication(), blockDB) cs.SetLogger(logger) @@ -262,7 +264,13 @@ func (w *crashingWAL) Stop() error { return w.next.Stop() } func (w *crashingWAL) Wait() { w.next.Wait() } //------------------------------------------------------------------------------------------ -// Handshake Tests +type testSim struct { + GenesisState sm.State + Config *cfg.Config + Chain []*types.Block + Commits []*types.Commit + CleanupFunc cleanupFunc +} const ( numBlocks = 6 @@ -271,6 +279,8 @@ const ( var ( mempool = sm.MockMempool{} evpool = sm.MockEvidencePool{} + + sim testSim ) //--------------------------------------- @@ -281,44 +291,283 @@ var ( // 2 - save block and committed but state is behind var modes = []uint{0, 1, 2} +// This is actually not a test, it's for storing validator change tx data for testHandshakeReplay +func TestSimulateValidatorsChange(t *testing.T) { + nPeers := 7 + nVals := 4 + css, genDoc, config, cleanup := randConsensusNetWithPeers(nVals, nPeers, "replay_test", newMockTickerFunc(true), newPersistentKVStoreWithPath) + sim.Config = config + sim.GenesisState, _ = sm.MakeGenesisState(genDoc) + sim.CleanupFunc = cleanup + + partSize := types.BlockPartSizeBytes + + newRoundCh := subscribe(css[0].eventBus, types.EventQueryNewRound) + proposalCh := subscribe(css[0].eventBus, types.EventQueryCompleteProposal) + + vss := make([]*validatorStub, nPeers) + for i := 0; i < nPeers; i++ { + vss[i] = NewValidatorStub(css[i].privValidator, i) + } + height, round := css[0].Height, css[0].Round + // start the machine + startTestRound(css[0], height, round) + incrementHeight(vss...) + ensureNewRound(newRoundCh, height, 0) + ensureNewProposal(proposalCh, height, round) + rs := css[0].GetRoundState() + signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + //height 2 + height++ + incrementHeight(vss...) + newValidatorPubKey1 := css[nVals].privValidator.GetPubKey() + valPubKey1ABCI := types.TM2PB.PubKey(newValidatorPubKey1) + newValidatorTx1 := kvstore.MakeValSetChangeTx(valPubKey1ABCI, testMinPower) + err := assertMempool(css[0].txNotifier).CheckTx(newValidatorTx1, nil) + assert.Nil(t, err) + propBlock, _ := css[0].createProposalBlock() //changeProposer(t, cs1, vs2) + propBlockParts := propBlock.MakePartSet(partSize) + blockID := types.BlockID{Hash: propBlock.Hash(), PartsHeader: propBlockParts.Header()} + proposal := types.NewProposal(vss[1].Height, round, -1, blockID) + if err := vss[1].SignProposal(config.ChainID(), proposal); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + //height 3 + height++ + incrementHeight(vss...) + updateValidatorPubKey1 := css[nVals].privValidator.GetPubKey() + updatePubKey1ABCI := types.TM2PB.PubKey(updateValidatorPubKey1) + updateValidatorTx1 := kvstore.MakeValSetChangeTx(updatePubKey1ABCI, 25) + err = assertMempool(css[0].txNotifier).CheckTx(updateValidatorTx1, nil) + assert.Nil(t, err) + propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) + propBlockParts = propBlock.MakePartSet(partSize) + blockID = types.BlockID{Hash: propBlock.Hash(), PartsHeader: propBlockParts.Header()} + proposal = types.NewProposal(vss[2].Height, round, -1, blockID) + if err := vss[2].SignProposal(config.ChainID(), proposal); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), vss[1:nVals]...) + ensureNewRound(newRoundCh, height+1, 0) + + //height 4 + height++ + incrementHeight(vss...) + newValidatorPubKey2 := css[nVals+1].privValidator.GetPubKey() + newVal2ABCI := types.TM2PB.PubKey(newValidatorPubKey2) + newValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx2, nil) + assert.Nil(t, err) + newValidatorPubKey3 := css[nVals+2].privValidator.GetPubKey() + newVal3ABCI := types.TM2PB.PubKey(newValidatorPubKey3) + newValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, testMinPower) + err = assertMempool(css[0].txNotifier).CheckTx(newValidatorTx3, nil) + assert.Nil(t, err) + propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) + propBlockParts = propBlock.MakePartSet(partSize) + blockID = types.BlockID{Hash: propBlock.Hash(), PartsHeader: propBlockParts.Header()} + newVss := make([]*validatorStub, nVals+1) + copy(newVss, vss[:nVals+1]) + sort.Sort(ValidatorStubsByAddress(newVss)) + selfIndex := 0 + for i, vs := range newVss { + if vs.GetPubKey().Equals(css[0].privValidator.GetPubKey()) { + selfIndex = i + break + } + } + + proposal = types.NewProposal(vss[3].Height, round, -1, blockID) + if err := vss[3].SignProposal(config.ChainID(), proposal); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + + removeValidatorTx2 := kvstore.MakeValSetChangeTx(newVal2ABCI, 0) + err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx2, nil) + assert.Nil(t, err) + + rs = css[0].GetRoundState() + for i := 0; i < nVals+1; i++ { + if i == selfIndex { + continue + } + signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i]) + } + + ensureNewRound(newRoundCh, height+1, 0) + + //height 5 + height++ + incrementHeight(vss...) + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + for i := 0; i < nVals+1; i++ { + if i == selfIndex { + continue + } + signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i]) + } + ensureNewRound(newRoundCh, height+1, 0) + + //height 6 + height++ + incrementHeight(vss...) + removeValidatorTx3 := kvstore.MakeValSetChangeTx(newVal3ABCI, 0) + err = assertMempool(css[0].txNotifier).CheckTx(removeValidatorTx3, nil) + assert.Nil(t, err) + propBlock, _ = css[0].createProposalBlock() //changeProposer(t, cs1, vs2) + propBlockParts = propBlock.MakePartSet(partSize) + blockID = types.BlockID{Hash: propBlock.Hash(), PartsHeader: propBlockParts.Header()} + newVss = make([]*validatorStub, nVals+3) + copy(newVss, vss[:nVals+3]) + sort.Sort(ValidatorStubsByAddress(newVss)) + for i, vs := range newVss { + if vs.GetPubKey().Equals(css[0].privValidator.GetPubKey()) { + selfIndex = i + break + } + } + proposal = types.NewProposal(vss[1].Height, round, -1, blockID) + if err := vss[1].SignProposal(config.ChainID(), proposal); err != nil { + t.Fatal("failed to sign bad proposal", err) + } + + // set the proposal block + if err := css[0].SetProposalAndBlock(proposal, propBlock, propBlockParts, "some peer"); err != nil { + t.Fatal(err) + } + ensureNewProposal(proposalCh, height, round) + rs = css[0].GetRoundState() + for i := 0; i < nVals+3; i++ { + if i == selfIndex { + continue + } + signAddVotes(css[0], types.PrecommitType, rs.ProposalBlock.Hash(), rs.ProposalBlockParts.Header(), newVss[i]) + } + ensureNewRound(newRoundCh, height+1, 0) + + sim.Chain = make([]*types.Block, 0) + sim.Commits = make([]*types.Commit, 0) + for i := 1; i <= numBlocks; i++ { + sim.Chain = append(sim.Chain, css[0].blockStore.LoadBlock(int64(i))) + sim.Commits = append(sim.Commits, css[0].blockStore.LoadBlockCommit(int64(i))) + } +} + // Sync from scratch func TestHandshakeReplayAll(t *testing.T) { - for i, m := range modes { - config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i)) - defer os.RemoveAll(config.RootDir) - - testHandshakeReplay(t, config, 0, m) + for _, m := range modes { + testHandshakeReplay(t, config, 0, m, false) + } + for _, m := range modes { + testHandshakeReplay(t, config, 0, m, true) } } // Sync many, not from scratch func TestHandshakeReplaySome(t *testing.T) { - for i, m := range modes { - config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i)) - defer os.RemoveAll(config.RootDir) - - testHandshakeReplay(t, config, 1, m) + for _, m := range modes { + testHandshakeReplay(t, config, 1, m, false) + } + for _, m := range modes { + testHandshakeReplay(t, config, 1, m, true) } } // Sync from lagging by one func TestHandshakeReplayOne(t *testing.T) { - for i, m := range modes { - config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i)) - defer os.RemoveAll(config.RootDir) - - testHandshakeReplay(t, config, numBlocks-1, m) + for _, m := range modes { + testHandshakeReplay(t, config, numBlocks-1, m, false) + } + for _, m := range modes { + testHandshakeReplay(t, config, numBlocks-1, m, true) } } // Sync from caught up func TestHandshakeReplayNone(t *testing.T) { - for i, m := range modes { - config := ResetConfig(fmt.Sprintf("%s_%v", t.Name(), i)) - defer os.RemoveAll(config.RootDir) - - testHandshakeReplay(t, config, numBlocks, m) + for _, m := range modes { + testHandshakeReplay(t, config, numBlocks, m, false) } + for _, m := range modes { + testHandshakeReplay(t, config, numBlocks, m, true) + } +} + +// Test mockProxyApp should not panic when app return ABCIResponses with some empty ResponseDeliverTx +func TestMockProxyApp(t *testing.T) { + sim.CleanupFunc() //clean the test env created in TestSimulateValidatorsChange + logger := log.TestingLogger() + var validTxs, invalidTxs = 0, 0 + txIndex := 0 + + assert.NotPanics(t, func() { + abciResWithEmptyDeliverTx := new(sm.ABCIResponses) + abciResWithEmptyDeliverTx.DeliverTx = make([]*abci.ResponseDeliverTx, 0) + abciResWithEmptyDeliverTx.DeliverTx = append(abciResWithEmptyDeliverTx.DeliverTx, &abci.ResponseDeliverTx{}) + + // called when saveABCIResponses: + bytes := cdc.MustMarshalBinaryBare(abciResWithEmptyDeliverTx) + loadedAbciRes := new(sm.ABCIResponses) + + // this also happens sm.LoadABCIResponses + err := cdc.UnmarshalBinaryBare(bytes, loadedAbciRes) + require.NoError(t, err) + + mock := newMockProxyApp([]byte("mock_hash"), loadedAbciRes) + + abciRes := new(sm.ABCIResponses) + abciRes.DeliverTx = make([]*abci.ResponseDeliverTx, len(loadedAbciRes.DeliverTx)) + // Execute transactions and get hash. + proxyCb := func(req *abci.Request, res *abci.Response) { + switch r := res.Value.(type) { + case *abci.Response_DeliverTx: + // TODO: make use of res.Log + // TODO: make use of this info + // Blocks may include invalid txs. + txRes := r.DeliverTx + if txRes.Code == abci.CodeTypeOK { + validTxs++ + } else { + logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log) + invalidTxs++ + } + abciRes.DeliverTx[txIndex] = txRes + txIndex++ + } + } + mock.SetResponseCallback(proxyCb) + + someTx := []byte("tx") + mock.DeliverTxAsync(someTx) + }) + assert.True(t, validTxs == 1) + assert.True(t, invalidTxs == 0) } func tempWALWithData(data []byte) string { @@ -336,54 +585,68 @@ func tempWALWithData(data []byte) string { return walFile.Name() } -// Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart -// the app and sync it up with the remaining blocks. -func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint) { - walBody, err := WALWithNBlocks(t, numBlocks) - require.NoError(t, err) - walFile := tempWALWithData(walBody) - config.Consensus.SetWalFile(walFile) +// Make some blocks. Start a fresh app and apply nBlocks blocks. Then restart the app and sync it up with the remaining blocks +func testHandshakeReplay(t *testing.T, config *cfg.Config, nBlocks int, mode uint, testValidatorsChange bool) { + var chain []*types.Block + var commits []*types.Commit + var store *mockBlockStore + var stateDB dbm.DB + var genisisState sm.State + if testValidatorsChange { + testConfig := ResetConfig(fmt.Sprintf("%s_%v_m", t.Name(), mode)) + defer os.RemoveAll(testConfig.RootDir) + stateDB = dbm.NewMemDB() + genisisState = sim.GenesisState + config = sim.Config + chain = sim.Chain + commits = sim.Commits + store = newMockBlockStore(config, genisisState.ConsensusParams) + } else { //test single node + testConfig := ResetConfig(fmt.Sprintf("%s_%v_s", t.Name(), mode)) + defer os.RemoveAll(testConfig.RootDir) + walBody, err := WALWithNBlocks(t, numBlocks) + require.NoError(t, err) + walFile := tempWALWithData(walBody) + config.Consensus.SetWalFile(walFile) - privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()) + privVal := privval.LoadFilePV(config.PrivValidatorKeyFile(), config.PrivValidatorStateFile()) - wal, err := NewWAL(walFile) - require.NoError(t, err) - wal.SetLogger(log.TestingLogger()) - err = wal.Start() - require.NoError(t, err) - defer wal.Stop() + wal, err := NewWAL(walFile) + require.NoError(t, err) + wal.SetLogger(log.TestingLogger()) + err = wal.Start() + require.NoError(t, err) + defer wal.Stop() - chain, commits, err := makeBlockchainFromWAL(wal) - require.NoError(t, err) - - stateDB, state, store := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion) + chain, commits, err = makeBlockchainFromWAL(wal) + require.NoError(t, err) + stateDB, genisisState, store = stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion) + } store.chain = chain store.commits = commits + state := genisisState.Copy() // run the chain through state.ApplyBlock to build up the tendermint state - clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "1"))) - proxyApp := proxy.NewAppConns(clientCreator) - err = proxyApp.Start() - require.NoError(t, err) - state = buildTMStateFromChain(config, stateDB, state, chain, proxyApp, mode) - proxyApp.Stop() + state = buildTMStateFromChain(config, stateDB, state, chain, nBlocks, mode) latestAppHash := state.AppHash // make a new client creator - kvstoreApp := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), "2")) + kvstoreApp := kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_a", nBlocks, mode))) + clientCreator2 := proxy.NewLocalClientCreator(kvstoreApp) if nBlocks > 0 { // run nBlocks against a new client to build up the app state. // use a throwaway tendermint state proxyApp := proxy.NewAppConns(clientCreator2) - stateDB, state, _ := stateAndStore(config, privVal.GetPubKey(), kvstore.ProtocolVersion) - buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode) + stateDB1 := dbm.NewMemDB() + sm.SaveState(stateDB1, genisisState) + buildAppStateFromChain(proxyApp, stateDB1, genisisState, chain, nBlocks, mode) } // now start the app using the handshake - it should sync genDoc, _ := sm.MakeGenesisDocFromFile(config.GenesisFile()) handshaker := NewHandshaker(stateDB, state, store, genDoc) - proxyApp = proxy.NewAppConns(clientCreator2) + proxyApp := proxy.NewAppConns(clientCreator2) if err := proxyApp.Start(); err != nil { t.Fatalf("Error starting proxy app connections: %v", err) } @@ -435,12 +698,14 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB, } defer proxyApp.Stop() + state.Version.Consensus.App = kvstore.ProtocolVersion //simulate handshake, receive app version validators := types.TM2PB.ValidatorUpdates(state.Validators) if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{ Validators: validators, }); err != nil { panic(err) } + sm.SaveState(stateDB, state) //save height 1's validatorsInfo switch mode { case 0: @@ -463,15 +728,23 @@ func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB, } -func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, - chain []*types.Block, proxyApp proxy.AppConns, mode uint) sm.State { +func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, nBlocks int, mode uint) sm.State { + // run the whole chain against this client to build up the tendermint state + clientCreator := proxy.NewLocalClientCreator(kvstore.NewPersistentKVStoreApplication(filepath.Join(config.DBDir(), fmt.Sprintf("replay_test_%d_%d_t", nBlocks, mode)))) + proxyApp := proxy.NewAppConns(clientCreator) + if err := proxyApp.Start(); err != nil { + panic(err) + } + defer proxyApp.Stop() + state.Version.Consensus.App = kvstore.ProtocolVersion //simulate handshake, receive app version validators := types.TM2PB.ValidatorUpdates(state.Validators) if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{ Validators: validators, }); err != nil { panic(err) } + sm.SaveState(stateDB, state) //save height 1's validatorsInfo switch mode { case 0: @@ -743,6 +1016,7 @@ func stateAndStore(config *cfg.Config, pubKey crypto.PubKey, appVersion version. state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile()) state.Version.Consensus.App = appVersion store := newMockBlockStore(config, state.ConsensusParams) + sm.SaveState(stateDB, state) return stateDB, state, store } diff --git a/consensus/state_test.go b/consensus/state_test.go index 67b5cfbd..87e351dc 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -239,7 +239,7 @@ func TestStateFullRound1(t *testing.T) { cs.SetEventBus(eventBus) eventBus.Start() - voteCh := subscribe(cs.eventBus, types.EventQueryVote) + voteCh := subscribeUnBuffered(cs.eventBus, types.EventQueryVote) propCh := subscribe(cs.eventBus, types.EventQueryCompleteProposal) newRoundCh := subscribe(cs.eventBus, types.EventQueryNewRound) @@ -267,7 +267,7 @@ func TestStateFullRoundNil(t *testing.T) { cs, vss := randConsensusState(1) height, round := cs.Height, cs.Round - voteCh := subscribe(cs.eventBus, types.EventQueryVote) + voteCh := subscribeUnBuffered(cs.eventBus, types.EventQueryVote) cs.enterPrevote(height, round) cs.startRoutines(4) @@ -286,7 +286,7 @@ func TestStateFullRound2(t *testing.T) { vs2 := vss[1] height, round := cs1.Height, cs1.Round - voteCh := subscribe(cs1.eventBus, types.EventQueryVote) + voteCh := subscribeUnBuffered(cs1.eventBus, types.EventQueryVote) newBlockCh := subscribe(cs1.eventBus, types.EventQueryNewBlock) // start round and wait for propose and prevote @@ -330,7 +330,7 @@ func TestStateLockNoPOL(t *testing.T) { timeoutProposeCh := subscribe(cs1.eventBus, types.EventQueryTimeoutPropose) timeoutWaitCh := subscribe(cs1.eventBus, types.EventQueryTimeoutWait) - voteCh := subscribe(cs1.eventBus, types.EventQueryVote) + voteCh := subscribeUnBuffered(cs1.eventBus, types.EventQueryVote) proposalCh := subscribe(cs1.eventBus, types.EventQueryCompleteProposal) newRoundCh := subscribe(cs1.eventBus, types.EventQueryNewRound) @@ -1623,3 +1623,12 @@ func subscribe(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Messa } return sub.Out() } + +// subscribe subscribes test client to the given query and returns a channel with cap = 0. +func subscribeUnBuffered(eventBus *types.EventBus, q tmpubsub.Query) <-chan tmpubsub.Message { + sub, err := eventBus.SubscribeUnbuffered(context.Background(), testSubscriber, q) + if err != nil { + panic(fmt.Sprintf("failed to subscribe %s to %v", testSubscriber, q)) + } + return sub.Out() +} diff --git a/consensus/wal_generator.go b/consensus/wal_generator.go index 1a4cfb9f..c007a21c 100644 --- a/consensus/wal_generator.go +++ b/consensus/wal_generator.go @@ -45,13 +45,14 @@ func WALGenerateNBlocks(t *testing.T, wr io.Writer, numBlocks int) (err error) { if err != nil { return errors.Wrap(err, "failed to read genesis file") } - stateDB := db.NewMemDB() blockStoreDB := db.NewMemDB() + stateDB := blockStoreDB state, err := sm.MakeGenesisState(genDoc) if err != nil { return errors.Wrap(err, "failed to make genesis state") } state.Version.Consensus.App = kvstore.ProtocolVersion + sm.SaveState(stateDB, state) blockStore := bc.NewBlockStore(blockStoreDB) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app)) proxyApp.SetLogger(logger.With("module", "proxy")) diff --git a/libs/pubsub/pubsub_test.go b/libs/pubsub/pubsub_test.go index 88447756..74af431f 100644 --- a/libs/pubsub/pubsub_test.go +++ b/libs/pubsub/pubsub_test.go @@ -46,13 +46,16 @@ func TestSubscribe(t *testing.T) { err = s.Publish(ctx, "Asylum") assert.NoError(t, err) + + err = s.Publish(ctx, "Ivan") + assert.NoError(t, err) }() select { case <-published: assertReceive(t, "Quicksilver", subscription.Out()) assertCancelled(t, subscription, pubsub.ErrOutOfCapacity) - case <-time.After(100 * time.Millisecond): + case <-time.After(3 * time.Second): t.Fatal("Expected Publish(Asylum) not to block") } } @@ -101,7 +104,7 @@ func TestSubscribeUnbuffered(t *testing.T) { select { case <-published: t.Fatal("Expected Publish(Darkhawk) to block") - case <-time.After(100 * time.Millisecond): + case <-time.After(3 * time.Second): assertReceive(t, "Ultron", subscription.Out()) assertReceive(t, "Darkhawk", subscription.Out()) } diff --git a/state/execution.go b/state/execution.go index 3a11ecca..24532552 100644 --- a/state/execution.go +++ b/state/execution.go @@ -66,6 +66,10 @@ func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsen return res } +func (blockExec *BlockExecutor) DB() dbm.DB { + return blockExec.db +} + // SetEventBus - sets the event bus for publishing block related events. // If not called, it defaults to types.NopEventBus. func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) { @@ -116,7 +120,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b } startTime := time.Now().UnixNano() - abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db) + abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, blockExec.db) endTime := time.Now().UnixNano() blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000) if err != nil { @@ -233,7 +237,6 @@ func execBlockOnProxyApp( logger log.Logger, proxyAppConn proxy.AppConnConsensus, block *types.Block, - lastValSet *types.ValidatorSet, stateDB dbm.DB, ) (*ABCIResponses, error) { var validTxs, invalidTxs = 0, 0 @@ -261,7 +264,7 @@ func execBlockOnProxyApp( } proxyAppConn.SetResponseCallback(proxyCb) - commitInfo, byzVals := getBeginBlockValidatorInfo(block, lastValSet, stateDB) + commitInfo, byzVals := getBeginBlockValidatorInfo(block, stateDB) // Begin block var err error @@ -296,22 +299,31 @@ func execBlockOnProxyApp( return abciResponses, nil } -func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorSet, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) { - - // Sanity check that commit length matches validator set size - - // only applies after first block +func getBeginBlockValidatorInfo(block *types.Block, stateDB dbm.DB) (abci.LastCommitInfo, []abci.Evidence) { + voteInfos := make([]abci.VoteInfo, block.LastCommit.Size()) + byzVals := make([]abci.Evidence, len(block.Evidence.Evidence)) + var lastValSet *types.ValidatorSet + var err error if block.Height > 1 { - precommitLen := len(block.LastCommit.Precommits) + lastValSet, err = LoadValidators(stateDB, block.Height-1) + if err != nil { + panic(err) // shouldn't happen + } + + // Sanity check that commit length matches validator set size - + // only applies after first block + + precommitLen := block.LastCommit.Size() valSetLen := len(lastValSet.Validators) if precommitLen != valSetLen { // sanity check panic(fmt.Sprintf("precommit length (%d) doesn't match valset length (%d) at height %d\n\n%v\n\n%v", precommitLen, valSetLen, block.Height, block.LastCommit.Precommits, lastValSet.Validators)) } + } else { + lastValSet = types.NewValidatorSet(nil) } - // Collect the vote info (list of validators and whether or not they signed). - voteInfos := make([]abci.VoteInfo, len(lastValSet.Validators)) for i, val := range lastValSet.Validators { var vote *types.CommitSig if i < len(block.LastCommit.Precommits) { @@ -324,12 +336,6 @@ func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorS voteInfos[i] = voteInfo } - commitInfo := abci.LastCommitInfo{ - Round: int32(block.LastCommit.Round()), - Votes: voteInfos, - } - - byzVals := make([]abci.Evidence, len(block.Evidence.Evidence)) for i, ev := range block.Evidence.Evidence { // We need the validator set. We already did this in validateBlock. // TODO: Should we instead cache the valset in the evidence itself and add @@ -341,6 +347,10 @@ func getBeginBlockValidatorInfo(block *types.Block, lastValSet *types.ValidatorS byzVals[i] = types.TM2PB.Evidence(ev, valset, block.Time) } + commitInfo := abci.LastCommitInfo{ + Round: int32(block.LastCommit.Round()), + Votes: voteInfos, + } return commitInfo, byzVals } @@ -469,10 +479,9 @@ func ExecCommitBlock( appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger, - lastValSet *types.ValidatorSet, stateDB dbm.DB, ) ([]byte, error) { - _, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB) + _, err := execBlockOnProxyApp(logger, appConnConsensus, block, stateDB) if err != nil { logger.Error("Error executing block on proxy app", "height", block.Height, "err", err) return nil, err diff --git a/state/execution_test.go b/state/execution_test.go index a9fdfe27..092fbd68 100644 --- a/state/execution_test.go +++ b/state/execution_test.go @@ -85,7 +85,7 @@ func TestBeginBlockValidators(t *testing.T) { // block for height 2 block, _ := state.MakeBlock(2, makeTxs(2), lastCommit, nil, state.Validators.GetProposer().Address) - _, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), state.Validators, stateDB) + _, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), stateDB) require.Nil(t, err, tc.desc) // -> app receives a list of validators with a bool indicating if they signed @@ -146,7 +146,7 @@ func TestBeginBlockByzantineValidators(t *testing.T) { block, _ := state.MakeBlock(10, makeTxs(2), lastCommit, nil, state.Validators.GetProposer().Address) block.Time = now block.Evidence.Evidence = tc.evidence - _, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), state.Validators, stateDB) + _, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), stateDB) require.Nil(t, err, tc.desc) // -> app must receive an index of the byzantine validator