diff --git a/consensus/common_test.go b/consensus/common_test.go index e6bba8e6..6ca96d7e 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -12,6 +12,7 @@ import ( cfg "github.com/tendermint/go-config" dbm "github.com/tendermint/go-db" bc "github.com/tendermint/tendermint/blockchain" + "github.com/tendermint/tendermint/config/tendermint_test" mempl "github.com/tendermint/tendermint/mempool" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" @@ -207,7 +208,7 @@ func fixedConsensusStateDummy() *ConsensusState { return cs } -func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { +func newConsensusStateWithConfig(thisConfig cfg.Config, state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { // Get BlockStore blockDB := dbm.NewMemDB() blockStore := bc.NewBlockStore(blockDB) @@ -218,10 +219,10 @@ func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Applic proxyAppConnCon := tmspcli.NewLocalClient(mtx, app) // Make Mempool - mempool := mempl.NewMempool(config, proxyAppConnMem) + mempool := mempl.NewMempool(thisConfig, proxyAppConnMem) // Make ConsensusReactor - cs := NewConsensusState(config, state, proxyAppConnCon, blockStore, mempool) + cs := NewConsensusState(thisConfig, state, proxyAppConnCon, blockStore, mempool) cs.SetPrivValidator(pv) evsw := types.NewEventSwitch() @@ -230,6 +231,10 @@ func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Applic return cs } +func newConsensusState(state *sm.State, pv *types.PrivValidator, app tmsp.Application) *ConsensusState { + return newConsensusStateWithConfig(config, state, pv, app) +} + func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { // Get State state, privVals := randGenesisState(nValidators, false, 10) @@ -254,13 +259,10 @@ func randConsensusNet(nValidators int) []*ConsensusState { db := dbm.NewMemDB() // each state needs its own db state := sm.MakeGenesisState(db, genDoc) state.Save() - css[i] = newConsensusState(state, privVals[i], counter.NewCounterApplication(true)) + thisConfig := tendermint_test.ResetConfig(Fmt("consensus_reactor_test_%d", i)) + EnsureDir(thisConfig.GetString("db_dir"), 0700) // dir for wal + css[i] = newConsensusStateWithConfig(thisConfig, state, privVals[i], counter.NewCounterApplication(true)) } - - // we use memdb, but need a dir for the cswal. - // in this case they all write to the same one but we dont care - // NOTE: they all share a pointer to the same config object! - EnsureDir(css[0].config.GetString("db_dir"), 0700) return css } diff --git a/consensus/reactor.go b/consensus/reactor.go index d5930be9..60c35fea 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -400,7 +400,7 @@ OUTER_LOOP: // Ensure that the peer's PartSetHeader is correct blockMeta := conR.blockStore.LoadBlockMeta(prs.Height) if blockMeta == nil { - log.Warn("Failed to load block meta", "peer height", prs.Height, "our height", rs.Height) + log.Warn("Failed to load block meta", "peer height", prs.Height, "our height", rs.Height, "blockstore height", conR.blockStore.Height(), "pv", conR.conS.privValidator) time.Sleep(peerGossipSleepDuration) continue OUTER_LOOP } else if !blockMeta.PartsHeader.Equals(prs.ProposalBlockPartsHeader) { diff --git a/consensus/reactor_test.go b/consensus/reactor_test.go index b5fbe44b..becb73a8 100644 --- a/consensus/reactor_test.go +++ b/consensus/reactor_test.go @@ -1,20 +1,19 @@ package consensus import ( - "fmt" - "net" + "bytes" "sync" "testing" "time" "github.com/tendermint/tendermint/config/tendermint_test" - "github.com/tendermint/ed25519" . "github.com/tendermint/go-common" cfg "github.com/tendermint/go-config" "github.com/tendermint/go-crypto" dbm "github.com/tendermint/go-db" "github.com/tendermint/go-events" + "github.com/tendermint/go-logger" "github.com/tendermint/go-p2p" bc "github.com/tendermint/tendermint/blockchain" "github.com/tendermint/tendermint/types" @@ -25,7 +24,8 @@ func init() { } func resetConfigTimeouts() { - config.Set("log_level", "notice") + logger.SetLogLevel("notice") + //config.Set("log_level", "notice") config.Set("timeout_propose", 2000) // config.Set("timeout_propose_delta", 500) // config.Set("timeout_prevote", 1000) @@ -59,7 +59,7 @@ func TestReactor(t *testing.T) { p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { s.AddReactor("CONSENSUS", reactors[i]) return s - }, net.Pipe) + }, p2p.Connect2Switches) // wait till everyone makes the first new block wg := new(sync.WaitGroup) @@ -85,6 +85,12 @@ func TestReactor(t *testing.T) { } } +// 4 validators. 1 is byzantine. The other three are partitioned into A (1 val) and B (2 vals). +// byzantine validator sends conflicting proposals into A and B, +// and prevotes/precommits on both of them. +// B sees a commit, A doesn't. +// Byzantine validator refuses to prevote. +// Heal partition and ensure A sees the commit func TestByzantine(t *testing.T) { resetConfigTimeouts() N := 4 @@ -95,48 +101,84 @@ func TestByzantine(t *testing.T) { switches[i] = p2p.NewSwitch(cfg.NewMapConfig(nil)) } - reactors := make([]*ConsensusReactor, N) + reactors := make([]p2p.Reactor, N) eventChans := make([]chan interface{}, N) for i := 0; i < N; i++ { blockStoreDB := dbm.NewDB(Fmt("blockstore%d", i), config.GetString("db_backend"), config.GetString("db_dir")) blockStore := bc.NewBlockStore(blockStoreDB) + var privVal PrivValidator + privVal = css[i].privValidator if i == 0 { + privVal = NewByzantinePrivValidator(privVal.(*types.PrivValidator)) // make byzantine css[i].decideProposal = func(j int) func(int, int) { return func(height, round int) { - fmt.Println("hmph", j) byzantineDecideProposalFunc(height, round, css[j], switches[j]) } }(i) css[i].doPrevote = func(height, round int) {} - css[i].setProposal = func(j int) func(proposal *types.Proposal) error { - return func(proposal *types.Proposal) error { - return byzantineSetProposal(proposal, css[j], switches[j]) - } - }(i) } - reactors[i] = NewConsensusReactor(css[i], blockStore, false) - reactors[i].SetPrivValidator(css[i].privValidator) eventSwitch := events.NewEventSwitch() _, err := eventSwitch.Start() if err != nil { t.Fatalf("Failed to start switch: %v", err) } - - reactors[i].SetEventSwitch(eventSwitch) eventChans[i] = subscribeToEvent(eventSwitch, "tester", types.EventStringNewBlock(), 1) + + conR := NewConsensusReactor(css[i], blockStore, false) + conR.SetPrivValidator(privVal) + conR.SetEventSwitch(eventSwitch) + + var conRI p2p.Reactor + conRI = conR + if i == 0 { + conRI = NewByzantineReactor(conR) + } + reactors[i] = conRI } + p2p.MakeConnectedSwitches(N, func(i int, s *p2p.Switch) *p2p.Switch { - s.AddReactor("CONSENSUS", reactors[i]) - return s - }, net.Pipe) + // ignore new switch s, we already made ours + switches[i].AddReactor("CONSENSUS", reactors[i]) + return switches[i] + }, func(sws []*p2p.Switch, i, j int) { + // the network starts partitioned with globally active adversary + if i != 0 { + return + } + p2p.Connect2Switches(sws, i, j) + }) + + // byz proposer sends one block to peers[0] + // and the other block to peers[1] and peers[2]. + // note peers and switches order don't match. + peers := switches[0].Peers().List() + ind0 := getSwitchIndex(switches, peers[0]) + ind1 := getSwitchIndex(switches, peers[1]) + ind2 := getSwitchIndex(switches, peers[2]) + + // connect the 2 peers in the larger partition + p2p.Connect2Switches(switches, ind1, ind2) + + // wait for someone in the big partition to make a block + + select { + case <-eventChans[ind2]: + } + + log.Notice("A block has been committed. Healing partition") + + // connect the partitions + p2p.Connect2Switches(switches, ind0, ind1) + p2p.Connect2Switches(switches, ind0, ind2) // wait till everyone makes the first new block + // (one of them already has) wg := new(sync.WaitGroup) - wg.Add(N) - for i := 0; i < N; i++ { + wg.Add(2) + for i := 1; i < N-1; i++ { go func(j int) { <-eventChans[j] wg.Done() @@ -149,7 +191,7 @@ func TestByzantine(t *testing.T) { close(done) }() - tick := time.NewTicker(time.Second * 3) + tick := time.NewTicker(time.Second * 10) select { case <-done: case <-tick.C: @@ -157,6 +199,16 @@ func TestByzantine(t *testing.T) { } } +func getSwitchIndex(switches []*p2p.Switch, peer *p2p.Peer) int { + for i, s := range switches { + if bytes.Equal(peer.NodeInfo.PubKey.Address(), s.NodeInfo().PubKey.Address()) { + return i + } + } + panic("didnt find peer in switches") + return -1 +} + //------------------------------- // byzantine consensus functions @@ -176,19 +228,22 @@ func byzantineDecideProposalFunc(height, round int, cs *ConsensusState, sw *p2p. proposal2 := types.NewProposal(height, round, blockParts2.Header(), polRound, polBlockID) cs.privValidator.SignProposal(cs.state.ChainID, proposal2) // byzantine doesnt err - log.Notice("Byzantine: broadcasting conflicting proposals") + block1Hash := block1.Hash() + block2Hash := block2.Hash() + // broadcast conflicting proposals/block parts to peers peers := sw.Peers().List() + log.Notice("Byzantine: broadcasting conflicting proposals", "peers", len(peers)) for i, peer := range peers { if i < len(peers)/2 { - go sendProposalAndParts(height, round, cs, peer, proposal1, block1, blockParts1) + go sendProposalAndParts(height, round, cs, peer, proposal1, block1Hash, blockParts1) } else { - go sendProposalAndParts(height, round, cs, peer, proposal2, block2, blockParts2) + go sendProposalAndParts(height, round, cs, peer, proposal2, block2Hash, blockParts2) } } } -func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, block *types.Block, parts *types.PartSet) { +func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, proposal *types.Proposal, blockHash []byte, parts *types.PartSet) { // proposal msg := &ProposalMessage{Proposal: proposal} peer.Send(DataChannel, struct{ ConsensusMessage }{msg}) @@ -205,55 +260,69 @@ func sendProposalAndParts(height, round int, cs *ConsensusState, peer *p2p.Peer, } // votes - prevote, _ := cs.signVote(types.VoteTypePrevote, block.Hash(), parts.Header()) + cs.mtx.Lock() + prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, parts.Header()) + precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, parts.Header()) + cs.mtx.Unlock() + peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}}) - precommit, _ := cs.signVote(types.VoteTypePrecommit, block.Hash(), parts.Header()) peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}}) } -func byzantineSetProposal(proposal *types.Proposal, cs *ConsensusState, sw *p2p.Switch) error { - peers := sw.Peers().List() - for _, peer := range peers { - // votes - var blockHash []byte // XXX proposal.BlockHash - blockHash = []byte{0, 1, 0, 2, 0, 3} - prevote, _ := cs.signVote(types.VoteTypePrevote, blockHash, proposal.BlockPartsHeader) - peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{prevote}}) - precommit, _ := cs.signVote(types.VoteTypePrecommit, blockHash, proposal.BlockPartsHeader) - peer.Send(VoteChannel, struct{ ConsensusMessage }{&VoteMessage{precommit}}) +//---------------------------------------- +// byzantine consensus reactor + +type ByzantineReactor struct { + Service + reactor *ConsensusReactor +} + +func NewByzantineReactor(conR *ConsensusReactor) *ByzantineReactor { + return &ByzantineReactor{ + Service: conR, + reactor: conR, } - return nil +} + +func (br *ByzantineReactor) SetSwitch(s *p2p.Switch) { br.reactor.SetSwitch(s) } +func (br *ByzantineReactor) GetChannels() []*p2p.ChannelDescriptor { return br.reactor.GetChannels() } +func (br *ByzantineReactor) AddPeer(peer *p2p.Peer) { + if !br.reactor.IsRunning() { + return + } + + // Create peerState for peer + peerState := NewPeerState(peer) + peer.Data.Set(types.PeerStateKey, peerState) + + // Send our state to peer. + // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). + if !br.reactor.fastSync { + br.reactor.sendNewRoundStepMessage(peer) + } +} +func (br *ByzantineReactor) RemovePeer(peer *p2p.Peer, reason interface{}) { + br.reactor.RemovePeer(peer, reason) +} +func (br *ByzantineReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) { + br.reactor.Receive(chID, peer, msgBytes) } //---------------------------------------- // byzantine privValidator -type ByzantinePrivValidator struct { - Address []byte `json:"address"` - PubKey crypto.PubKey `json:"pub_key"` - // PrivKey should be empty if a Signer other than the default is being used. - PrivKey crypto.PrivKey `json:"priv_key"` +type ByzantinePrivValidator struct { + Address []byte `json:"address"` types.Signer `json:"-"` mtx sync.Mutex } -func (privVal *ByzantinePrivValidator) SetSigner(s types.Signer) { - privVal.Signer = s -} - -// Generates a new validator with private key. -func GenPrivValidator() *ByzantinePrivValidator { - privKeyBytes := new([64]byte) - copy(privKeyBytes[:32], crypto.CRandBytes(32)) - pubKeyBytes := ed25519.MakePublicKey(privKeyBytes) - pubKey := crypto.PubKeyEd25519(*pubKeyBytes) - privKey := crypto.PrivKeyEd25519(*privKeyBytes) +// Return a priv validator that will sign anything +func NewByzantinePrivValidator(pv *types.PrivValidator) *ByzantinePrivValidator { return &ByzantinePrivValidator{ - Address: pubKey.Address(), - PubKey: pubKey, - PrivKey: privKey, - Signer: types.NewDefaultSigner(privKey), + Address: pv.Address, + Signer: pv.Signer, } }