diff --git a/consensus/reactor.go b/consensus/reactor.go index 1d8155c1..ce25442f 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -242,7 +242,7 @@ func (conR *ConsensusReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) ps.EnsureVoteBitArrays(height-1, lastCommitSize) ps.SetHasVote(msg.Vote) - conR.conS.peerMsgQueue <- msgInfo{msg, src.Key} + cs.peerMsgQueue <- msgInfo{msg, src.Key} default: // don't punish (leave room for soft upgrades) diff --git a/consensus/state.go b/consensus/state.go index 98255186..415f6af3 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -35,6 +35,7 @@ var ( //----------------------------------------------------------------------------- // RoundStepType enum type +// RoundStepType enumerates the state of the consensus state machine type RoundStepType uint8 // These must be numeric, ordered. const ( @@ -49,6 +50,7 @@ const ( // NOTE: RoundStepNewHeight acts as RoundStepCommitWait. ) +// String returns a string func (rs RoundStepType) String() string { switch rs { case RoundStepNewHeight: @@ -74,7 +76,8 @@ func (rs RoundStepType) String() string { //----------------------------------------------------------------------------- -// Immutable when returned from ConsensusState.GetRoundState() +// RoundState defines the internal consensus state. +// It is Immutable when returned from ConsensusState.GetRoundState() // TODO: Actually, only the top pointer is copied, // so access to field pointers is still racey type RoundState struct { @@ -96,6 +99,7 @@ type RoundState struct { LastValidators *types.ValidatorSet } +// RoundStateEvent returns the H/R/S of the RoundState as an event. func (rs *RoundState) RoundStateEvent() types.EventDataRoundState { edrs := types.EventDataRoundState{ Height: rs.Height, @@ -106,10 +110,12 @@ func (rs *RoundState) RoundStateEvent() types.EventDataRoundState { return edrs } +// String returns a string func (rs *RoundState) String() string { return rs.StringIndented("") } +// StringIndented returns a string func (rs *RoundState) StringIndented(indent string) string { return fmt.Sprintf(`RoundState{ %s H:%v R:%v S:%v @@ -138,6 +144,7 @@ func (rs *RoundState) StringIndented(indent string) string { indent) } +// StringShort returns a string func (rs *RoundState) StringShort() string { return fmt.Sprintf(`RoundState{H:%v R:%v S:%v ST:%v}`, rs.Height, rs.Round, rs.Step, rs.StartTime) @@ -167,13 +174,17 @@ func (ti *timeoutInfo) String() string { return fmt.Sprintf("%v ; %d/%d %v", ti.Duration, ti.Height, ti.Round, ti.Step) } +// PrivValidator is a validator that can sign votes and proposals. type PrivValidator interface { GetAddress() []byte SignVote(chainID string, vote *types.Vote) error SignProposal(chainID string, proposal *types.Proposal) error } -// Tracks consensus state across block heights and rounds. +// ConsensusState handles execution of the consensus algorithm. +// It processes votes and proposals, and upon reaching agreement, +// commits blocks to the chain and executes them against the application. +// The internal state machine receives input from peers, the internal validator, and from a timer. type ConsensusState struct { cmn.BaseService @@ -218,6 +229,7 @@ type ConsensusState struct { done chan struct{} } +// NewConsensusState returns a new ConsensusState. func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool) *ConsensusState { cs := &ConsensusState{ config: config, @@ -256,17 +268,20 @@ func (cs *ConsensusState) SetEventSwitch(evsw types.EventSwitch) { cs.evsw = evsw } +// String returns a string. func (cs *ConsensusState) String() string { // better not to access shared variables return cmn.Fmt("ConsensusState") //(H:%v R:%v S:%v", cs.Height, cs.Round, cs.Step) } +// GetState returns a copy of the chain state. func (cs *ConsensusState) GetState() *sm.State { cs.mtx.Lock() defer cs.mtx.Unlock() return cs.state.Copy() } +// GetRoundState returns a copy of the internal consensus state. func (cs *ConsensusState) GetRoundState() *RoundState { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -278,26 +293,28 @@ func (cs *ConsensusState) getRoundState() *RoundState { return &rs } +// GetValidators returns a copy of the current validators. func (cs *ConsensusState) GetValidators() (int, []*types.Validator) { cs.mtx.Lock() defer cs.mtx.Unlock() return cs.state.LastBlockHeight, cs.state.Validators.Copy().Validators } -// Sets our private validator account for signing votes. +// SetPrivValidator sets the private validator account for signing votes. func (cs *ConsensusState) SetPrivValidator(priv PrivValidator) { cs.mtx.Lock() defer cs.mtx.Unlock() cs.privValidator = priv } -// Set the local timer +// SetTimeoutTicker sets the local timer. It may be useful to overwrite for testing. func (cs *ConsensusState) SetTimeoutTicker(timeoutTicker TimeoutTicker) { cs.mtx.Lock() defer cs.mtx.Unlock() cs.timeoutTicker = timeoutTicker } +// LoadCommit loads the commit for a given height. func (cs *ConsensusState) LoadCommit(height int) *types.Commit { cs.mtx.Lock() defer cs.mtx.Unlock() @@ -307,6 +324,8 @@ func (cs *ConsensusState) LoadCommit(height int) *types.Commit { return cs.blockStore.LoadBlockCommit(height) } +// OnStart implements cmn.Service. +// It loads the latest state via the WAL, and starts the timeout and receive routines. func (cs *ConsensusState) OnStart() error { walFile := cs.config.WalFile() @@ -347,6 +366,7 @@ func (cs *ConsensusState) startRoutines(maxSteps int) { go cs.receiveRoutine(maxSteps) } +// OnStop implements cmn.Service. It stops all routines and waits for the WAL to finish. func (cs *ConsensusState) OnStop() { cs.BaseService.OnStop() @@ -358,13 +378,14 @@ func (cs *ConsensusState) OnStop() { } } +// Wait waits for the the main routine to return. // NOTE: be sure to Stop() the event switch and drain // any event channels or this may deadlock func (cs *ConsensusState) Wait() { <-cs.done } -// Open file to log all consensus messages and timeouts for deterministic accountability +// OpenWAL opens a file to log all consensus messages and timeouts for deterministic accountability func (cs *ConsensusState) OpenWAL(walFile string) (err error) { err = cmn.EnsureDir(path.Dir(walFile), 0700) if err != nil { @@ -387,11 +408,13 @@ func (cs *ConsensusState) OpenWAL(walFile string) (err error) { } //------------------------------------------------------------ -// Public interface for passing messages into the consensus state, -// possibly causing a state transition +// Public interface for passing messages into the consensus state, possibly causing a state transition. +// If peerKey == "", the msg is considered internal. +// Messages are added to the appropriate queue (peer or internal). +// If the queue is full, the function may block. // TODO: should these return anything or let callers just use events? -// May block on send if queue is full. +// AddVote inputs a vote. func (cs *ConsensusState) AddVote(vote *types.Vote, peerKey string) (added bool, err error) { if peerKey == "" { cs.internalMsgQueue <- msgInfo{&VoteMessage{vote}, ""} @@ -403,7 +426,7 @@ func (cs *ConsensusState) AddVote(vote *types.Vote, peerKey string) (added bool, return false, nil } -// May block on send if queue is full. +// SetProposal inputs a proposal. func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string) error { if peerKey == "" { @@ -416,7 +439,7 @@ func (cs *ConsensusState) SetProposal(proposal *types.Proposal, peerKey string) return nil } -// May block on send if queue is full. +// AddProposalBlockPart inputs a part of the proposal block. func (cs *ConsensusState) AddProposalBlockPart(height, round int, part *types.Part, peerKey string) error { if peerKey == "" { @@ -429,7 +452,7 @@ func (cs *ConsensusState) AddProposalBlockPart(height, round int, part *types.Pa return nil } -// May block on send if queue is full. +// SetProposalAndBlock inputs the proposal and all block parts. func (cs *ConsensusState) SetProposalAndBlock(proposal *types.Proposal, block *types.Block, parts *types.PartSet, peerKey string) error { cs.SetProposal(proposal, peerKey) for i := 0; i < parts.Total(); i++ { @@ -704,9 +727,11 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) { // State functions // Used internally by handleTimeout and handleMsg to make state transitions -// Enter: +2/3 precommits for nil at (height,round-1) +// Enter: `timeoutNewHeight` by startTime (commitTime+timeoutCommit), +// or, if SkipTimeout==true, after receiving all precommits from (height,round-1) // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) -// Enter: `startTime = commitTime+timeoutCommit` from NewHeight(height) +// Enter: +2/3 precommits for nil at (height,round-1) +// Enter: +2/3 prevotes any or +2/3 precommits for block or any from (height, round) // NOTE: cs.StartTime was already set for height. func (cs *ConsensusState) enterNewRound(height int, round int) { if cs.Height != height || round < cs.Round || (cs.Round == round && cs.Step != RoundStepNewHeight) { @@ -749,7 +774,7 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { cs.enterPropose(height, round) } -// Enter: from NewRound(height,round). +// Enter: from enterNewRound(height,round). func (cs *ConsensusState) enterPropose(height int, round int) { if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) { cs.Logger.Debug(cmn.Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) @@ -927,7 +952,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int, round int) { return } - // Valdiate proposal block + // Validate proposal block err := cs.state.ValidateBlock(cs.ProposalBlock) if err != nil { // ProposalBlock is invalid, prevote nil. @@ -964,8 +989,8 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) { cs.scheduleTimeout(cs.config.Prevote(round), height, round, RoundStepPrevoteWait) } -// Enter: +2/3 precomits for block or nil. // Enter: `timeoutPrevote` after any +2/3 prevotes. +// Enter: +2/3 precomits for block or nil. // Enter: any +2/3 precommits for next round. // Lock & precommit the ProposalBlock if we have enough prevotes for it (a POL in this round) // else, unlock an existing lock and precommit nil if +2/3 of prevotes were nil, diff --git a/consensus/ticker.go b/consensus/ticker.go index e47a3412..317268b7 100644 --- a/consensus/ticker.go +++ b/consensus/ticker.go @@ -3,7 +3,7 @@ package consensus import ( "time" - . "github.com/tendermint/tmlibs/common" + cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" ) @@ -29,24 +29,26 @@ type TimeoutTicker interface { // Timeouts are scheduled along the tickChan, // and fired on the tockChan. type timeoutTicker struct { - BaseService + cmn.BaseService timer *time.Timer - tickChan chan timeoutInfo - tockChan chan timeoutInfo + tickChan chan timeoutInfo // for scheduling timeouts + tockChan chan timeoutInfo // for notifying about them } +// NewTimeoutTicker returns a new TimeoutTicker. func NewTimeoutTicker() TimeoutTicker { tt := &timeoutTicker{ timer: time.NewTimer(0), tickChan: make(chan timeoutInfo, tickTockBufferSize), tockChan: make(chan timeoutInfo, tickTockBufferSize), } - tt.BaseService = *NewBaseService(nil, "TimeoutTicker", tt) + tt.BaseService = *cmn.NewBaseService(nil, "TimeoutTicker", tt) tt.stopTimer() // don't want to fire until the first scheduled timeout return tt } +// OnStart implements cmn.Service. It starts the timeout routine. func (t *timeoutTicker) OnStart() error { go t.timeoutRoutine() @@ -54,16 +56,19 @@ func (t *timeoutTicker) OnStart() error { return nil } +// OnStop implements cmn.Service. It stops the timeout routine. func (t *timeoutTicker) OnStop() { t.BaseService.OnStop() t.stopTimer() } +// Chan returns a channel on which timeouts are sent. func (t *timeoutTicker) Chan() <-chan timeoutInfo { return t.tockChan } -// The timeoutRoutine is alwaya available to read from tickChan (it won't block). +// ScheduleTimeout schedules a new timeout by sending on the internal tickChan. +// The timeoutRoutine is alwaya available to read from tickChan, so this won't block. // The scheduling may fail if the timeoutRoutine has already scheduled a timeout for a later height/round/step. func (t *timeoutTicker) ScheduleTimeout(ti timeoutInfo) { t.tickChan <- ti