From cf3abe5096682fafeab49e257fcaadc106d63abb Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Tue, 25 Jul 2017 10:52:14 -0400 Subject: [PATCH] consensus: remove rs from handleMsg --- consensus/replay.go | 2 +- consensus/state.go | 18 ++++++++++++------ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/consensus/replay.go b/consensus/replay.go index c4ed684c..0c7c27e9 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -82,7 +82,7 @@ func (cs *ConsensusState) readReplayMessage(msgBytes []byte, newStepCh chan inte "blockID", v.BlockID, "peer", peerKey) } - cs.handleMsg(m, cs.RoundState) + cs.handleMsg(m) case timeoutInfo: cs.Logger.Info("Replay: Timeout", "height", m.Height, "round", m.Round, "step", m.Step, "dur", m.Duration) cs.handleTimeout(m, cs.RoundState) diff --git a/consensus/state.go b/consensus/state.go index b18b40db..d1dec7f5 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -611,7 +611,8 @@ func (cs *ConsensusState) newStep() { // receiveRoutine handles messages which may cause state transitions. // it's argument (n) is the number of messages to process before exiting - use 0 to run forever // It keeps the RoundState and is the only thing that updates it. -// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities +// Updates (state transitions) happen on timeouts, complete proposals, and 2/3 majorities. +// ConsensusState must be locked before any internal state is updated. func (cs *ConsensusState) receiveRoutine(maxSteps int) { for { if maxSteps > 0 { @@ -625,17 +626,19 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { var mi msgInfo select { - case rs_ := <-cs.txsAvailable: - cs.enterPropose(rs_.Height, rs_.Round) + case <-cs.txsAvailable: + // use nil for this special internal message signalling txs are available. + // no need to write this to the wal + // cs.handleMsg(msgInfo{nil, ""}, rs_) case mi = <-cs.peerMsgQueue: cs.wal.Save(mi) // handles proposals, block parts, votes // may generate internal events (votes, complete proposals, 2/3 majorities) - cs.handleMsg(mi, rs) + cs.handleMsg(mi) case mi = <-cs.internalMsgQueue: cs.wal.Save(mi) // handles proposals, block parts, votes - cs.handleMsg(mi, rs) + cs.handleMsg(mi) case ti := <-cs.timeoutTicker.Chan(): // tockChan: cs.wal.Save(ti) // if the timeout is relevant to the rs @@ -659,13 +662,16 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { } // state transitions on complete-proposal, 2/3-any, 2/3-one -func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) { +func (cs *ConsensusState) handleMsg(mi msgInfo) { cs.mtx.Lock() defer cs.mtx.Unlock() var err error msg, peerKey := mi.Msg, mi.PeerKey switch msg := msg.(type) { + case nil: + // transactions are available, so enterPropose + // cs.enterPropose(rs.Height, rs.Round) case *ProposalMessage: // will not cause transition. // once proposal is set, we can receive block parts