move routines to consensus state. drop locks and other go routines

This commit is contained in:
Ethan Buchman
2015-12-10 11:41:18 -05:00
parent 4b6e992a47
commit d9b55101e5
4 changed files with 295 additions and 262 deletions

View File

@@ -4,6 +4,7 @@ import (
"bytes"
"errors"
"fmt"
"reflect"
"sync"
"time"
@@ -24,6 +25,7 @@ var (
timeoutPrecommit0 = 1000 * time.Millisecond // After any +2/3 precommits received, wait this long for stragglers.
timeoutPrecommitDelta = 0500 * time.Millisecond // timeoutPrecommitN is timeoutPrecommit0 + timeoutPrecommitDelta*N
timeoutCommit = 2000 * time.Millisecond // After +2/3 commits received for committed block, wait this long for stragglers in the next height's RoundStepNewHeight.
)
var (
@@ -176,7 +178,7 @@ func (ti *timeoutInfo) String() string {
// Tracks consensus state across block heights and rounds.
type ConsensusState struct {
BaseService
QuitService
proxyAppCtx proxy.AppContext
blockStore *bc.BlockStore
@@ -206,14 +208,14 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore
mempool: mempool,
newStepCh: make(chan *RoundState, 10),
msgQueue: make(chan msgInfo, msgQueueSize),
tickChan: make(chan timeoutInfo, tickBufferSize),
tockChan: make(chan timeoutInfo, tockBufferSize),
tickChan: make(chan timeoutInfo),
tockChan: make(chan timeoutInfo),
}
cs.updateToState(state)
// Don't call scheduleRound0 yet.
// We do that upon Start().
cs.reconstructLastCommit(state)
cs.BaseService = *NewBaseService(log, "ConsensusState", cs)
cs.QuitService = *NewQuitService(log, "ConsensusState", cs)
return cs
}
@@ -263,10 +265,17 @@ func (cs *ConsensusState) NewStepCh() chan *RoundState {
func (cs *ConsensusState) OnStart() error {
cs.BaseService.OnStart()
cs.startRoutines()
go cs.scheduleRound0(cs.Height)
return nil
}
func (cs *ConsensusState) startRoutines() {
go cs.timeoutRoutine() // receive requests for timeouts on tickChan and fire timeouts on tockChan
go cs.receiveRoutine() // serializes processing of proposoals, block parts, votes and coordinates state transitions
}
func (cs *ConsensusState) OnStop() {
// It's asynchronous so, there's not much to stop.
cs.BaseService.OnStop()
@@ -279,6 +288,9 @@ func (cs *ConsensusState) updateHeight(height int) {
}
func (cs *ConsensusState) updateRoundStep(round int, step RoundStepType) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
cs.Round = round
cs.Step = step
@@ -291,15 +303,140 @@ func (cs *ConsensusState) scheduleRound0(height int) {
//log.Info("scheduleRound0", "now", time.Now(), "startTime", cs.StartTime)
sleepDuration := cs.StartTime.Sub(time.Now())
log.Debug("scheduleRound0 by firing on tick", "height", height)
// if sleepDuration is 0 it will just relay it to tockChan right away
cs.scheduleHeightRoundStep(sleepDuration, height, 0, RoundStepNewHeight)
// TODO: this go-routine ...
go func() {
if 0 < sleepDuration {
time.Sleep(sleepDuration)
// TODO: event?
}
cs.EnterNewRound(height, 0, false)
}()
}
func (cs *ConsensusState) scheduleHeightRoundStep(duration time.Duration, height, round int, step RoundStepType) {
// Attempt to schedule a timeout by sending timeoutInfo on the tickChan.
// The timeoutRoutine is alwaya available to read from tickChan (it won't block).
// The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step.
func (cs *ConsensusState) scheduleTimeout(duration time.Duration, height, round int, step RoundStepType) {
cs.tickChan <- timeoutInfo{duration, height, round, step}
}
// the state machine sends on tickChan to start a new timer.
// timers are interupted and replaced by new ticks from later steps
// timeouts of 0 on the tickChan will be immediately relayed to the tockChan
func (cs *ConsensusState) timeoutRoutine() {
log.Debug("Starting timeout routine")
var timer <-chan time.Time
var ti timeoutInfo
for {
select {
case newti := <-cs.tickChan:
log.Debug("Received tick", "old_ti", ti, "new_ti", newti)
// ignore tickers for old height/round/step
if newti.height < ti.height {
continue
} else if newti.height == ti.height {
if newti.round < ti.round {
continue
} else if newti.round == ti.round {
if ti.step > 0 && newti.step <= ti.step {
continue
}
}
}
ti = newti
log.Info("Scheduling timeout", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
timer = time.After(ti.duration)
case <-timer:
// these need to not timeout!
// (we can collapse this into receive routine by managing the timer manually)
log.Info("Timed out", "dur", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
go func() { cs.tockChan <- ti }()
case <-cs.Quit:
return
}
}
}
// receiveRoutine handles messages which may affect the state.
// It should keep the RoundState and be the only thing that updates it.
// TODO: this round state is still confounded with cs.RoundState
func (cs *ConsensusState) receiveRoutine() {
for {
rs := cs.roundState
var mi msgInfo
select {
case mi = <-cs.msgQueue:
// handles proposals, block parts, votes
// may generate internal events (votes, complete proposals, 2/3 majorities)
cs.handleMsg(mi, rs)
case ti := <-cs.tockChan:
// if the timeout is relevant to the rs
// go to the next step
cs.handleTimeout(ti, rs)
case <-cs.Quit:
return
}
}
}
func (cs *ConsensusState) handleMsg(mi msgInfo, rs RoundState) {
var err error
msg, peerKey := mi.msg, mi.peerKey
switch msg := msg.(type) {
case *ProposalMessage:
err = cs.SetProposal(msg.Proposal)
case *BlockPartMessage:
_, err = cs.AddProposalBlockPart(msg.Height, msg.Part)
case *VoteMessage:
// attempt to add the vote and dupeout the validator if its a duplicate signature
added, err := cs.TryAddVote(msg.ValidatorIndex, msg.Vote, peerKey)
if err == ErrAddingVote {
// TODO: punish peer
}
if added {
// If rs.Height == vote.Height && rs.Round < vote.Round,
// the peer is sending us CatchupCommit precommits.
// We could make note of this and help filter in broadcastHasVoteMessage().
// XXX TODO: we want this routine to run in the cnsensus state so how do we call the reactor?!
// conR.broadcastHasVoteMessage(msg.Vote, msg.ValidatorIndex)
}
default:
log.Warn("Unknown msg type", reflect.TypeOf(msg))
}
if err != nil {
log.Error("error with msg", "error", err)
}
}
func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
log.Debug("Received tock", "timeout", ti.duration, "height", ti.height, "round", ti.round, "step", ti.step)
// timeouts must be for current height, round, step
if ti.height != rs.Height || ti.round < rs.Round || (ti.round == rs.Round && ti.step < rs.Step) {
log.Debug("Ignoring tock because we're ahead", "height", rs.Height, "round", rs.Round, "step", rs.Step)
return
}
switch ti.step {
case RoundStepPropose:
log.Debug("ENTERING PREVOTE")
cs.EnterPrevote(ti.height, ti.round, true)
case RoundStepPrevoteWait:
cs.EnterPrecommit(ti.height, ti.round, true)
case RoundStepPrecommitWait:
cs.EnterNewRound(ti.height, ti.round+1, true)
default:
panic(Fmt("Invalid timeout step: %v", ti.step))
}
}
// Updates ConsensusState and increments height to match that of state.
// The round becomes 0 and cs.Step becomes RoundStepNewHeight.
func (cs *ConsensusState) updateToState(state *sm.State) {
@@ -380,8 +517,6 @@ func (cs *ConsensusState) SetPrivValidator(priv *types.PrivValidator) {
// Enter: `startTime = commitTime+timeoutCommit` from NewHeight(height)
// NOTE: cs.StartTime was already set for height.
func (cs *ConsensusState) EnterNewRound(height int, round int, timedOut bool) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != RoundStepNewHeight) {
log.Debug(Fmt("EnterNewRound(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@@ -420,35 +555,22 @@ func (cs *ConsensusState) EnterNewRound(height int, round int, timedOut bool) {
cs.evsw.FireEvent(types.EventStringNewRound(), cs.RoundStateEvent())
// Immediately go to EnterPropose.
go cs.EnterPropose(height, round)
cs.EnterPropose(height, round)
}
// Enter: from NewRound(height,round).
func (cs *ConsensusState) EnterPropose(height int, round int) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
// cs.mtx.Lock()
// cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) {
log.Debug(Fmt("EnterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
}
log.Info(Fmt("EnterPropose(%v/%v). Current: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
defer func() {
// Done EnterPropose:
cs.updateRoundStep(round, RoundStepPropose)
cs.newStepCh <- cs.getRoundState()
// If we have the whole proposal + POL, then goto Prevote now.
// else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
// or else after timeoutPropose
if cs.isProposalComplete() {
go cs.EnterPrevote(height, cs.Round, false)
}
}()
// This step times out after `timeoutPropose`
cs.tickChan <- timeoutInfo{timeoutPropose, height, round, RoundStepPropose}
log.Debug("started timer")
cs.scheduleTimeout(timeoutPropose, height, round, RoundStepPropose)
// Nothing more to do if we're not a validator
if cs.privValidator == nil {
@@ -461,6 +583,17 @@ func (cs *ConsensusState) EnterPropose(height int, round int) {
log.Info("EnterPropose: Our turn to propose", "proposer", cs.Validators.Proposer().Address, "privValidator", cs.privValidator)
cs.decideProposal(height, round)
}
// Done EnterPropose:
cs.updateRoundStep(round, RoundStepPropose)
cs.newStepCh <- cs.getRoundState()
// If we have the whole proposal + POL, then goto Prevote now.
// else, we'll EnterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
// or else after timeoutPropose
if cs.isProposalComplete() {
cs.EnterPrevote(height, cs.Round, false)
}
}
func (cs *ConsensusState) decideProposal(height, round int) {
@@ -491,13 +624,13 @@ func (cs *ConsensusState) decideProposal(height, round int) {
cs.ProposalBlockParts = blockParts
// TODO: can we do better than just launching a go routine?
go func() {
/*go func() {
cs.msgQueue <- msgInfo{&ProposalMessage{proposal}, ""}
for i := 0; i < blockParts.Total(); i++ {
part := blockParts.GetPart(i)
cs.msgQueue <- msgInfo{&BlockPartMessage{cs.Height, cs.Round, part}, ""}
}
}()
}()*/
} else {
log.Warn("EnterPropose: Error signing proposal", "height", height, "round", round, "error", err)
}
@@ -574,8 +707,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
// Prevote for LockedBlock if we're locked, or ProposalBlock if valid.
// Otherwise vote nil.
func (cs *ConsensusState) EnterPrevote(height int, round int, timedOut bool) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevote <= cs.Step) {
log.Debug(Fmt("EnterPrevote(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@@ -598,7 +731,9 @@ func (cs *ConsensusState) EnterPrevote(height int, round int, timedOut bool) {
// Done EnterPrevote:
cs.updateRoundStep(round, RoundStepPrevote)
log.Debug("wait on new step")
cs.newStepCh <- cs.getRoundState()
log.Debug("done new step")
// Once `addVote` hits any +2/3 prevotes, we will go to PrevoteWait
// (so we have more time to try and collect +2/3 prevotes for a single block)
@@ -637,8 +772,8 @@ func (cs *ConsensusState) doPrevote(height int, round int) {
// Enter: any +2/3 prevotes at next round.
func (cs *ConsensusState) EnterPrevoteWait(height int, round int) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrevoteWait <= cs.Step) {
log.Debug(Fmt("EnterPrevoteWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@@ -653,7 +788,7 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) {
cs.newStepCh <- cs.getRoundState()
// After `timeoutPrevote0+timeoutPrevoteDelta*round`, EnterPrecommit()
cs.tickChan <- timeoutInfo{timeoutPrevote0 + timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevote}
cs.scheduleTimeout(timeoutPrevote0+timeoutPrevoteDelta*time.Duration(round), height, round, RoundStepPrevoteWait)
}
// Enter: +2/3 precomits for block or nil.
@@ -663,8 +798,8 @@ func (cs *ConsensusState) EnterPrevoteWait(height int, round int) {
// else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil,
// else, precommit nil otherwise.
func (cs *ConsensusState) EnterPrecommit(height int, round int, timedOut bool) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
//cs.mtx.Lock()
// defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommit <= cs.Step) {
log.Debug(Fmt("EnterPrecommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@@ -762,8 +897,8 @@ func (cs *ConsensusState) EnterPrecommit(height int, round int, timedOut bool) {
// Enter: any +2/3 precommits for next round.
func (cs *ConsensusState) EnterPrecommitWait(height int, round int) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPrecommitWait <= cs.Step) {
log.Debug(Fmt("EnterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
return
@@ -778,14 +913,14 @@ func (cs *ConsensusState) EnterPrecommitWait(height int, round int) {
cs.newStepCh <- cs.getRoundState()
// After `timeoutPrecommit0+timeoutPrecommitDelta*round`, EnterNewRound()
cs.tickChan <- timeoutInfo{timeoutPrecommit0 + timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommit}
cs.scheduleTimeout(timeoutPrecommit0+timeoutPrecommitDelta*time.Duration(round), height, round, RoundStepPrecommitWait)
}
// Enter: +2/3 precommits for block
func (cs *ConsensusState) EnterCommit(height int, commitRound int) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || RoundStepCommit <= cs.Step {
log.Debug(Fmt("EnterCommit(%v/%v): Invalid args. Current step: %v/%v/%v", height, commitRound, cs.Height, cs.Round, cs.Step))
return
@@ -844,13 +979,14 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) {
log.Warn("Attempt to finalize failed. We don't have the commit block.")
return
}
go cs.FinalizeCommit(height)
// go
cs.FinalizeCommit(height)
}
// Increment height and goto RoundStepNewHeight
func (cs *ConsensusState) FinalizeCommit(height int) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
if cs.Height != height || cs.Step != RoundStepCommit {
log.Debug(Fmt("FinalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step))
@@ -879,7 +1015,8 @@ func (cs *ConsensusState) FinalizeCommit(height int) {
cs.updateToState(cs.stagedState)
// cs.StartTime is already set.
// Schedule Round0 to start soon.
go cs.scheduleRound0(height + 1)
// go
cs.scheduleRound0(height + 1)
// By here,
// * cs.Height has been increment to height+1
@@ -891,8 +1028,8 @@ func (cs *ConsensusState) FinalizeCommit(height int) {
//-----------------------------------------------------------------------------
func (cs *ConsensusState) SetProposal(proposal *types.Proposal) error {
cs.mtx.Lock()
defer cs.mtx.Unlock()
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
// Already have one
if cs.Proposal != nil {
@@ -928,8 +1065,8 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal) error {
// NOTE: block is not necessarily valid.
// This can trigger us to go into EnterPrevote asynchronously (before we timeout of propose) or to attempt to commit
func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (added bool, err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
//cs.mtx.Lock()
//defer cs.mtx.Unlock()
// Blocks might be reused, so round mismatch is OK
if cs.Height != height {
@@ -953,7 +1090,8 @@ func (cs *ConsensusState) AddProposalBlockPart(height int, part *types.Part) (ad
log.Info("Received complete proposal", "hash", cs.ProposalBlock.Hash())
if cs.Step == RoundStepPropose && cs.isProposalComplete() {
// Move onto the next step
go cs.EnterPrevote(height, cs.Round, false)
//go cs.EnterPrevote(height, cs.Round, false)
cs.EnterPrevote(height, cs.Round, false)
} else if cs.Step == RoundStepCommit {
// If we're waiting on the proposal block...
cs.tryFinalizeCommit(height)
@@ -993,8 +1131,8 @@ func (cs *ConsensusState) TryAddVote(valIndex int, vote *types.Vote, peerKey str
}
func (cs *ConsensusState) AddVote(valIndex int, vote *types.Vote, peerKey string) (added bool, address []byte, err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
// cs.mtx.Lock()
// defer cs.mtx.Unlock()
return cs.addVote(valIndex, vote, peerKey)
}
@@ -1050,19 +1188,20 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
}
if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() {
// Round-skip over to PrevoteWait or goto Precommit.
go func() {
cs.EnterNewRound(height, vote.Round, false)
if prevotes.HasTwoThirdsMajority() {
cs.EnterPrecommit(height, vote.Round, false)
} else {
cs.EnterPrevote(height, vote.Round, false)
cs.EnterPrevoteWait(height, vote.Round)
}
}()
//go func() {
cs.EnterNewRound(height, vote.Round, false)
if prevotes.HasTwoThirdsMajority() {
cs.EnterPrecommit(height, vote.Round, false)
} else {
cs.EnterPrevote(height, vote.Round, false)
cs.EnterPrevoteWait(height, vote.Round)
}
//}()
} else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round {
// If the proposal is now complete, enter prevote of cs.Round.
if cs.isProposalComplete() {
go cs.EnterPrevote(height, cs.Round, false)
// go
cs.EnterPrevote(height, cs.Round, false)
}
}
case types.VoteTypePrecommit:
@@ -1070,21 +1209,21 @@ func (cs *ConsensusState) addVote(valIndex int, vote *types.Vote, peerKey string
log.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort())
hash, _, ok := precommits.TwoThirdsMajority()
if ok {
go func() {
if len(hash) == 0 {
cs.EnterNewRound(height, vote.Round+1, false)
} else {
cs.EnterNewRound(height, vote.Round, false)
cs.EnterPrecommit(height, vote.Round, false)
cs.EnterCommit(height, vote.Round)
}
}()
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
go func() {
//go func() {
if len(hash) == 0 {
cs.EnterNewRound(height, vote.Round+1, false)
} else {
cs.EnterNewRound(height, vote.Round, false)
cs.EnterPrecommit(height, vote.Round, false)
cs.EnterPrecommitWait(height, vote.Round)
}()
cs.EnterCommit(height, vote.Round)
}
//}()
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
//go func() {
cs.EnterNewRound(height, vote.Round, false)
cs.EnterPrecommit(height, vote.Round, false)
cs.EnterPrecommitWait(height, vote.Round)
//}()
}
default:
PanicSanity(Fmt("Unexpected vote type %X", vote.Type)) // Should not happen.
@@ -1161,9 +1300,9 @@ func (cs *ConsensusState) signAddVote(type_ byte, hash []byte, header types.Part
// XXX: maybe we can use a "personal" channel in the receiveRoutine select
// and fire these there so we don't possibly block on a full msgQueue.
// though if things are really backed up, we could block on the personal channel too
go func() {
/*go func() {
cs.msgQueue <- msgInfo{&VoteMessage{valIndex, vote}, ""}
}()
}()*/
return vote
} else {
log.Warn("Error signing vote", "height", cs.Height, "round", cs.Round, "vote", vote, "error", err)