diff --git a/consensus/height_vote_set_test.go b/consensus/height_vote_set_test.go index 78733e41..3bede25c 100644 --- a/consensus/height_vote_set_test.go +++ b/consensus/height_vote_set_test.go @@ -25,11 +25,17 @@ func TestPeerCatchupRounds(t *testing.T) { vote1000_0 := makeVoteHR(t, 1, 1000, privVals, 0) added, err = hvs.AddVote(vote1000_0, "peer1") + if !added || err != nil { + t.Error("Expected to successfully add vote from peer", added, err) + } + + vote1001_0 := makeVoteHR(t, 1, 1001, privVals, 0) + added, err = hvs.AddVote(vote1001_0, "peer1") if added { t.Error("Expected to *not* add vote from peer, too many catchup rounds.") } - added, err = hvs.AddVote(vote1000_0, "peer2") + added, err = hvs.AddVote(vote1001_0, "peer2") if !added || err != nil { t.Error("Expected to successfully add vote from another peer") } diff --git a/consensus/reactor.go b/consensus/reactor.go index c1c2aaf3..0ece82ce 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -123,7 +123,6 @@ func (conR *ConsensusReactor) AddPeer(peer *p2p.Peer) { go conR.gossipDataRoutine(peer, peerState) go conR.gossipVotesRoutine(peer, peerState) go conR.queryMaj23Routine(peer, peerState) - go conR.replyMaj23Routine(peer, peerState) // Send our state to peer. // If we're fast_syncing, broadcast a RoundStepMessage later upon SwitchToConsensus(). @@ -178,10 +177,31 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) cs.mtx.Lock() height, votes := cs.Height, cs.Votes cs.mtx.Unlock() - if height == msg.Height { - votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID) + if height != msg.Height { + return } - ps.ApplyVoteSetMaj23Message(msg) + // Peer claims to have a maj23 for some BlockID at H,R,S, + votes.SetPeerMaj23(msg.Round, msg.Type, ps.Peer.Key, msg.BlockID) + // Respond with a VoteSetBitsMessage showing which votes we have. + // (and consequently shows which we don't have) + var ourVotes *BitArray + switch msg.Type { + case types.VoteTypePrevote: + ourVotes = votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) + case types.VoteTypePrecommit: + ourVotes = votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) + default: + log.Warn("Bad VoteSetBitsMessage field Type") + return + } + src.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{ + Height: msg.Height, + Round: msg.Round, + Type: msg.Type, + BlockID: msg.BlockID, + Votes: ourVotes, + }}) + default: log.Warn(Fmt("Unknown message type %v", reflect.TypeOf(msg))) } @@ -660,44 +680,6 @@ OUTER_LOOP: } } -func (conR *ConsensusReactor) replyMaj23Routine(peer *p2p.Peer, ps *PeerState) { - log := log.New("peer", peer) - -OUTER_LOOP: - for { - // Manage disconnects from self or peer. - if !peer.IsRunning() || !conR.IsRunning() { - log.Notice(Fmt("Stopping replyMaj23Routine for %v.", peer)) - return - } - rs := conR.conS.GetRoundState() - - // Process a VoteSetMaj23Message - msg := <-ps.Maj23Queue - if rs.Height == msg.Height { - var ourVotes *BitArray - switch msg.Type { - case types.VoteTypePrevote: - ourVotes = rs.Votes.Prevotes(msg.Round).BitArrayByBlockID(msg.BlockID) - case types.VoteTypePrecommit: - ourVotes = rs.Votes.Precommits(msg.Round).BitArrayByBlockID(msg.BlockID) - default: - log.Warn("Bad VoteSetBitsMessage field Type") - return - } - peer.TrySend(VoteSetBitsChannel, struct{ ConsensusMessage }{&VoteSetBitsMessage{ - Height: msg.Height, - Round: msg.Round, - Type: msg.Type, - BlockID: msg.BlockID, - Votes: ourVotes, - }}) - } - - continue OUTER_LOOP - } -} - func (conR *ConsensusReactor) String() string { return conR.StringIndented("") } @@ -770,7 +752,6 @@ type PeerState struct { mtx sync.Mutex PeerRoundState - Maj23Queue chan *VoteSetMaj23Message } func NewPeerState(peer *p2p.Peer) *PeerState { @@ -782,7 +763,6 @@ func NewPeerState(peer *p2p.Peer) *PeerState { LastCommitRound: -1, CatchupCommitRound: -1, }, - Maj23Queue: make(chan *VoteSetMaj23Message, 2), } } @@ -873,7 +853,7 @@ func (ps *PeerState) PickVoteToSend(votes types.VoteSetReader) (vote *types.Vote } func (ps *PeerState) getVoteBitArray(height, round int, type_ byte) *BitArray { - if type_ != types.VoteTypePrevote && type_ != types.VoteTypePrecommit { + if !types.IsVoteTypeValid(type_) { PanicSanity("Invalid vote type") } @@ -1077,21 +1057,6 @@ func (ps *PeerState) ApplyHasVoteMessage(msg *HasVoteMessage) { ps.setHasVote(msg.Height, msg.Round, msg.Type, msg.Index) } -// When a peer claims to have a maj23 for some BlockID at H,R,S, -// we will try to respond with a VoteSetBitsMessage showing which -// bits we already have (and which we don't yet have), -// but that happens in another goroutine. -func (ps *PeerState) ApplyVoteSetMaj23Message(msg *VoteSetMaj23Message) { - // ps.mtx.Lock() - // defer ps.mtx.Unlock() - - select { - case ps.Maj23Queue <- msg: - default: - // Just ignore if we're already processing messages. - } -} - // The peer has responded with a bitarray of votes that it has // of the corresponding BlockID. // ourVotes: BitArray of votes we have for msg.BlockID @@ -1121,11 +1086,9 @@ func (ps *PeerState) StringIndented(indent string) string { return fmt.Sprintf(`PeerState{ %s Key %v %s PRS %v -%s MjQ %v %s}`, indent, ps.Peer.Key, indent, ps.PeerRoundState.StringIndented(indent+" "), - indent, len(ps.Maj23Queue), indent) }