mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 06:42:16 +00:00
consensus: Wait for proposal or timeout before prevote (#2540)
* Fix termination issues and improve tests * Improve formatting and tests based on reviewer feedback
This commit is contained in:
parent
e1538bf67e
commit
2363d88979
@ -50,6 +50,8 @@ BUG FIXES:
|
|||||||
- [node] \#2434 Make node respond to signal interrupts while sleeping for genesis time
|
- [node] \#2434 Make node respond to signal interrupts while sleeping for genesis time
|
||||||
- [consensus] [\#1690](https://github.com/tendermint/tendermint/issues/1690) wait for
|
- [consensus] [\#1690](https://github.com/tendermint/tendermint/issues/1690) wait for
|
||||||
timeoutPrecommit before starting next round
|
timeoutPrecommit before starting next round
|
||||||
|
- [consensus] [\#1745](https://github.com/tendermint/tendermint/issues/1745) wait for
|
||||||
|
Proposal or timeoutProposal before entering prevote
|
||||||
- [evidence] \#2515 fix db iter leak (@goolAdapter)
|
- [evidence] \#2515 fix db iter leak (@goolAdapter)
|
||||||
- [common/bit_array] Fixed a bug in the `Or` function
|
- [common/bit_array] Fixed a bug in the `Or` function
|
||||||
- [common/bit_array] Fixed a bug in the `Sub` function (@james-ray)
|
- [common/bit_array] Fixed a bug in the `Sub` function (@james-ray)
|
||||||
|
@ -565,7 +565,7 @@ func DefaultConsensusConfig() *ConsensusConfig {
|
|||||||
// TestConsensusConfig returns a configuration for testing the consensus service
|
// TestConsensusConfig returns a configuration for testing the consensus service
|
||||||
func TestConsensusConfig() *ConsensusConfig {
|
func TestConsensusConfig() *ConsensusConfig {
|
||||||
cfg := DefaultConsensusConfig()
|
cfg := DefaultConsensusConfig()
|
||||||
cfg.TimeoutPropose = 100 * time.Millisecond
|
cfg.TimeoutPropose = 40 * time.Millisecond
|
||||||
cfg.TimeoutProposeDelta = 1 * time.Millisecond
|
cfg.TimeoutProposeDelta = 1 * time.Millisecond
|
||||||
cfg.TimeoutPrevote = 10 * time.Millisecond
|
cfg.TimeoutPrevote = 10 * time.Millisecond
|
||||||
cfg.TimeoutPrevoteDelta = 1 * time.Millisecond
|
cfg.TimeoutPrevoteDelta = 1 * time.Millisecond
|
||||||
|
@ -39,8 +39,8 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// genesis, chain_id, priv_val
|
// genesis, chain_id, priv_val
|
||||||
var config *cfg.Config // NOTE: must be reset for each _test.go file
|
var config *cfg.Config // NOTE: must be reset for each _test.go file
|
||||||
var ensureTimeout = time.Second * 1 // must be in seconds because CreateEmptyBlocksInterval is
|
var ensureTimeout = time.Millisecond * 100
|
||||||
|
|
||||||
func ensureDir(dir string, mode os.FileMode) {
|
func ensureDir(dir string, mode os.FileMode) {
|
||||||
if err := cmn.EnsureDir(dir, mode); err != nil {
|
if err := cmn.EnsureDir(dir, mode); err != nil {
|
||||||
@ -317,67 +317,156 @@ func ensureNoNewEvent(ch <-chan interface{}, timeout time.Duration,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ensureNoNewStep(stepCh <-chan interface{}) {
|
func ensureNoNewEventOnChannel(ch <-chan interface{}) {
|
||||||
ensureNoNewEvent(stepCh, ensureTimeout, "We should be stuck waiting, "+
|
ensureNoNewEvent(
|
||||||
"not moving to the next step")
|
ch,
|
||||||
|
ensureTimeout,
|
||||||
|
"We should be stuck waiting, not receiving new event on the channel")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureNoNewRoundStep(stepCh <-chan interface{}) {
|
||||||
|
ensureNoNewEvent(
|
||||||
|
stepCh,
|
||||||
|
ensureTimeout,
|
||||||
|
"We should be stuck waiting, not receiving NewRoundStep event")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureNoNewUnlock(unlockCh <-chan interface{}) {
|
||||||
|
ensureNoNewEvent(
|
||||||
|
unlockCh,
|
||||||
|
ensureTimeout,
|
||||||
|
"We should be stuck waiting, not receiving Unlock event")
|
||||||
}
|
}
|
||||||
|
|
||||||
func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
|
func ensureNoNewTimeout(stepCh <-chan interface{}, timeout int64) {
|
||||||
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
|
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
|
||||||
ensureNoNewEvent(stepCh, timeoutDuration, "We should be stuck waiting, "+
|
ensureNoNewEvent(
|
||||||
"not moving to the next step")
|
stepCh,
|
||||||
|
timeoutDuration,
|
||||||
|
"We should be stuck waiting, not receiving NewTimeout event")
|
||||||
}
|
}
|
||||||
|
|
||||||
func ensureNewEvent(ch <-chan interface{}, timeout time.Duration, errorMessage string) {
|
func ensureNewEvent(
|
||||||
|
ch <-chan interface{},
|
||||||
|
height int64,
|
||||||
|
round int,
|
||||||
|
timeout time.Duration,
|
||||||
|
errorMessage string) {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
panic(errorMessage)
|
panic(errorMessage)
|
||||||
case <-ch:
|
case ev := <-ch:
|
||||||
break
|
rs, ok := ev.(types.EventDataRoundState)
|
||||||
|
if !ok {
|
||||||
|
panic(
|
||||||
|
fmt.Sprintf(
|
||||||
|
"expected a EventDataRoundState, got %v.Wrong subscription channel?",
|
||||||
|
reflect.TypeOf(rs)))
|
||||||
|
}
|
||||||
|
if rs.Height != height {
|
||||||
|
panic(fmt.Sprintf("expected height %v, got %v", height, rs.Height))
|
||||||
|
}
|
||||||
|
if rs.Round != round {
|
||||||
|
panic(fmt.Sprintf("expected round %v, got %v", round, rs.Round))
|
||||||
|
}
|
||||||
|
// TODO: We could check also for a step at this point!
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ensureNewStep(stepCh <-chan interface{}) {
|
func ensureNewRoundStep(stepCh <-chan interface{}, height int64, round int) {
|
||||||
ensureNewEvent(stepCh, ensureTimeout,
|
ensureNewEvent(
|
||||||
|
stepCh,
|
||||||
|
height,
|
||||||
|
round,
|
||||||
|
ensureTimeout,
|
||||||
"Timeout expired while waiting for NewStep event")
|
"Timeout expired while waiting for NewStep event")
|
||||||
}
|
}
|
||||||
|
|
||||||
func ensureNewRound(roundCh <-chan interface{}) {
|
func ensureNewVote(voteCh <-chan interface{}, height int64, round int) {
|
||||||
ensureNewEvent(roundCh, ensureTimeout,
|
|
||||||
"Timeout expired while waiting for NewRound event")
|
|
||||||
}
|
|
||||||
|
|
||||||
func ensureNewTimeout(timeoutCh <-chan interface{}, timeout int64) {
|
|
||||||
timeoutDuration := time.Duration(timeout*5) * time.Nanosecond
|
|
||||||
ensureNewEvent(timeoutCh, timeoutDuration,
|
|
||||||
"Timeout expired while waiting for NewTimeout event")
|
|
||||||
}
|
|
||||||
|
|
||||||
func ensureNewProposal(proposalCh <-chan interface{}) {
|
|
||||||
ensureNewEvent(proposalCh, ensureTimeout,
|
|
||||||
"Timeout expired while waiting for NewProposal event")
|
|
||||||
}
|
|
||||||
|
|
||||||
func ensureNewBlock(blockCh <-chan interface{}) {
|
|
||||||
ensureNewEvent(blockCh, ensureTimeout,
|
|
||||||
"Timeout expired while waiting for NewBlock event")
|
|
||||||
}
|
|
||||||
|
|
||||||
func ensureNewVote(voteCh <-chan interface{}) {
|
|
||||||
ensureNewEvent(voteCh, ensureTimeout,
|
|
||||||
"Timeout expired while waiting for NewVote event")
|
|
||||||
}
|
|
||||||
|
|
||||||
func ensureNewUnlock(unlockCh <-chan interface{}) {
|
|
||||||
ensureNewEvent(unlockCh, ensureTimeout,
|
|
||||||
"Timeout expired while waiting for NewUnlock event")
|
|
||||||
}
|
|
||||||
|
|
||||||
func ensureVote(voteCh chan interface{}, height int64, round int,
|
|
||||||
voteType byte) {
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(ensureTimeout):
|
case <-time.After(ensureTimeout):
|
||||||
break
|
break
|
||||||
|
case v := <-voteCh:
|
||||||
|
edv, ok := v.(types.EventDataVote)
|
||||||
|
if !ok {
|
||||||
|
panic(fmt.Sprintf("expected a *types.Vote, "+
|
||||||
|
"got %v. wrong subscription channel?",
|
||||||
|
reflect.TypeOf(v)))
|
||||||
|
}
|
||||||
|
vote := edv.Vote
|
||||||
|
if vote.Height != height {
|
||||||
|
panic(fmt.Sprintf("expected height %v, got %v", height, vote.Height))
|
||||||
|
}
|
||||||
|
if vote.Round != round {
|
||||||
|
panic(fmt.Sprintf("expected round %v, got %v", round, vote.Round))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureNewRound(roundCh <-chan interface{}, height int64, round int) {
|
||||||
|
ensureNewEvent(roundCh, height, round, ensureTimeout,
|
||||||
|
"Timeout expired while waiting for NewRound event")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureNewTimeout(timeoutCh <-chan interface{}, height int64, round int, timeout int64) {
|
||||||
|
timeoutDuration := time.Duration(timeout*3) * time.Nanosecond
|
||||||
|
ensureNewEvent(timeoutCh, height, round, timeoutDuration,
|
||||||
|
"Timeout expired while waiting for NewTimeout event")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureNewProposal(proposalCh <-chan interface{}, height int64, round int) {
|
||||||
|
ensureNewEvent(proposalCh, height, round, ensureTimeout,
|
||||||
|
"Timeout expired while waiting for NewProposal event")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureNewBlock(blockCh <-chan interface{}, height int64) {
|
||||||
|
select {
|
||||||
|
case <-time.After(ensureTimeout):
|
||||||
|
panic("Timeout expired while waiting for NewBlock event")
|
||||||
|
case ev := <-blockCh:
|
||||||
|
block, ok := ev.(types.EventDataNewBlock)
|
||||||
|
if !ok {
|
||||||
|
panic(fmt.Sprintf("expected a *types.EventDataNewBlock, "+
|
||||||
|
"got %v. wrong subscription channel?",
|
||||||
|
reflect.TypeOf(block)))
|
||||||
|
}
|
||||||
|
if block.Block.Height != height {
|
||||||
|
panic(fmt.Sprintf("expected height %v, got %v", height, block.Block.Height))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureNewBlockHeader(blockCh <-chan interface{}, height int64, blockHash cmn.HexBytes) {
|
||||||
|
select {
|
||||||
|
case <-time.After(ensureTimeout):
|
||||||
|
panic("Timeout expired while waiting for NewBlockHeader event")
|
||||||
|
case ev := <-blockCh:
|
||||||
|
blockHeader, ok := ev.(types.EventDataNewBlockHeader)
|
||||||
|
if !ok {
|
||||||
|
panic(fmt.Sprintf("expected a *types.EventDataNewBlockHeader, "+
|
||||||
|
"got %v. wrong subscription channel?",
|
||||||
|
reflect.TypeOf(blockHeader)))
|
||||||
|
}
|
||||||
|
if blockHeader.Header.Height != height {
|
||||||
|
panic(fmt.Sprintf("expected height %v, got %v", height, blockHeader.Header.Height))
|
||||||
|
}
|
||||||
|
if !bytes.Equal(blockHeader.Header.Hash(), blockHash) {
|
||||||
|
panic(fmt.Sprintf("expected header %X, got %X", blockHash, blockHeader.Header.Hash()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureNewUnlock(unlockCh <-chan interface{}, height int64, round int) {
|
||||||
|
ensureNewEvent(unlockCh, height, round, ensureTimeout,
|
||||||
|
"Timeout expired while waiting for NewUnlock event")
|
||||||
|
}
|
||||||
|
|
||||||
|
func ensureVote(voteCh <-chan interface{}, height int64, round int,
|
||||||
|
voteType byte) {
|
||||||
|
select {
|
||||||
|
case <-time.After(ensureTimeout):
|
||||||
|
panic("Timeout expired while waiting for NewVote event")
|
||||||
case v := <-voteCh:
|
case v := <-voteCh:
|
||||||
edv, ok := v.(types.EventDataVote)
|
edv, ok := v.(types.EventDataVote)
|
||||||
if !ok {
|
if !ok {
|
||||||
@ -398,6 +487,14 @@ func ensureVote(voteCh chan interface{}, height int64, round int,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ensureNewEventOnChannel(ch <-chan interface{}) {
|
||||||
|
select {
|
||||||
|
case <-time.After(ensureTimeout):
|
||||||
|
panic("Timeout expired while waiting for new activity on the channel")
|
||||||
|
case <-ch:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//-------------------------------------------------------------------------------
|
//-------------------------------------------------------------------------------
|
||||||
// consensus nets
|
// consensus nets
|
||||||
|
|
||||||
|
@ -28,12 +28,12 @@ func TestMempoolNoProgressUntilTxsAvailable(t *testing.T) {
|
|||||||
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
|
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
|
||||||
startTestRound(cs, height, round)
|
startTestRound(cs, height, round)
|
||||||
|
|
||||||
ensureNewStep(newBlockCh) // first block gets committed
|
ensureNewEventOnChannel(newBlockCh) // first block gets committed
|
||||||
ensureNoNewStep(newBlockCh)
|
ensureNoNewEventOnChannel(newBlockCh)
|
||||||
deliverTxsRange(cs, 0, 1)
|
deliverTxsRange(cs, 0, 1)
|
||||||
ensureNewStep(newBlockCh) // commit txs
|
ensureNewEventOnChannel(newBlockCh) // commit txs
|
||||||
ensureNewStep(newBlockCh) // commit updated app hash
|
ensureNewEventOnChannel(newBlockCh) // commit updated app hash
|
||||||
ensureNoNewStep(newBlockCh)
|
ensureNoNewEventOnChannel(newBlockCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
|
func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
|
||||||
@ -46,9 +46,9 @@ func TestMempoolProgressAfterCreateEmptyBlocksInterval(t *testing.T) {
|
|||||||
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
|
newBlockCh := subscribe(cs.eventBus, types.EventQueryNewBlock)
|
||||||
startTestRound(cs, height, round)
|
startTestRound(cs, height, round)
|
||||||
|
|
||||||
ensureNewStep(newBlockCh) // first block gets committed
|
ensureNewEventOnChannel(newBlockCh) // first block gets committed
|
||||||
ensureNoNewStep(newBlockCh) // then we dont make a block ...
|
ensureNoNewEventOnChannel(newBlockCh) // then we dont make a block ...
|
||||||
ensureNewStep(newBlockCh) // until the CreateEmptyBlocksInterval has passed
|
ensureNewEventOnChannel(newBlockCh) // until the CreateEmptyBlocksInterval has passed
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMempoolProgressInHigherRound(t *testing.T) {
|
func TestMempoolProgressInHigherRound(t *testing.T) {
|
||||||
@ -72,13 +72,19 @@ func TestMempoolProgressInHigherRound(t *testing.T) {
|
|||||||
}
|
}
|
||||||
startTestRound(cs, height, round)
|
startTestRound(cs, height, round)
|
||||||
|
|
||||||
ensureNewStep(newRoundCh) // first round at first height
|
ensureNewRoundStep(newRoundCh, height, round) // first round at first height
|
||||||
ensureNewStep(newBlockCh) // first block gets committed
|
ensureNewEventOnChannel(newBlockCh) // first block gets committed
|
||||||
ensureNewStep(newRoundCh) // first round at next height
|
|
||||||
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
|
height = height + 1 // moving to the next height
|
||||||
<-timeoutCh
|
round = 0
|
||||||
ensureNewStep(newRoundCh) // wait for the next round
|
|
||||||
ensureNewStep(newBlockCh) // now we can commit the block
|
ensureNewRoundStep(newRoundCh, height, round) // first round at next height
|
||||||
|
deliverTxsRange(cs, 0, 1) // we deliver txs, but dont set a proposal so we get the next round
|
||||||
|
ensureNewTimeout(timeoutCh, height, round, cs.config.TimeoutPropose.Nanoseconds())
|
||||||
|
|
||||||
|
round = round + 1 // moving to the next round
|
||||||
|
ensureNewRoundStep(newRoundCh, height, round) // wait for the next round
|
||||||
|
ensureNewEventOnChannel(newBlockCh) // now we can commit the block
|
||||||
}
|
}
|
||||||
|
|
||||||
func deliverTxsRange(cs *ConsensusState, start, end int) {
|
func deliverTxsRange(cs *ConsensusState, start, end int) {
|
||||||
|
@ -83,7 +83,8 @@ type ConsensusState struct {
|
|||||||
// internal state
|
// internal state
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
cstypes.RoundState
|
cstypes.RoundState
|
||||||
state sm.State // State until height-1.
|
triggeredTimeoutPrecommit bool
|
||||||
|
state sm.State // State until height-1.
|
||||||
|
|
||||||
// state changes may be triggered by: msgs from peers,
|
// state changes may be triggered by: msgs from peers,
|
||||||
// msgs from ourself, or by timeouts
|
// msgs from ourself, or by timeouts
|
||||||
@ -711,6 +712,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs cstypes.RoundState) {
|
|||||||
cs.enterPrecommit(ti.Height, ti.Round)
|
cs.enterPrecommit(ti.Height, ti.Round)
|
||||||
case cstypes.RoundStepPrecommitWait:
|
case cstypes.RoundStepPrecommitWait:
|
||||||
cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent())
|
cs.eventBus.PublishEventTimeoutWait(cs.RoundStateEvent())
|
||||||
|
cs.enterPrecommit(ti.Height, ti.Round)
|
||||||
cs.enterNewRound(ti.Height, ti.Round+1)
|
cs.enterNewRound(ti.Height, ti.Round+1)
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("Invalid timeout step: %v", ti.Step))
|
panic(fmt.Sprintf("Invalid timeout step: %v", ti.Step))
|
||||||
@ -772,6 +774,7 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
|
|||||||
cs.ProposalBlockParts = nil
|
cs.ProposalBlockParts = nil
|
||||||
}
|
}
|
||||||
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
|
cs.Votes.SetRound(round + 1) // also track next round (round+1) to allow round-skipping
|
||||||
|
cs.triggeredTimeoutPrecommit = false
|
||||||
|
|
||||||
cs.eventBus.PublishEventNewRound(cs.RoundStateEvent())
|
cs.eventBus.PublishEventNewRound(cs.RoundStateEvent())
|
||||||
cs.metrics.Rounds.Set(float64(round))
|
cs.metrics.Rounds.Set(float64(round))
|
||||||
@ -782,7 +785,8 @@ func (cs *ConsensusState) enterNewRound(height int64, round int) {
|
|||||||
waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
|
waitForTxs := cs.config.WaitForTxs() && round == 0 && !cs.needProofBlock(height)
|
||||||
if waitForTxs {
|
if waitForTxs {
|
||||||
if cs.config.CreateEmptyBlocksInterval > 0 {
|
if cs.config.CreateEmptyBlocksInterval > 0 {
|
||||||
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round, cstypes.RoundStepNewRound)
|
cs.scheduleTimeout(cs.config.CreateEmptyBlocksInterval, height, round,
|
||||||
|
cstypes.RoundStepNewRound)
|
||||||
}
|
}
|
||||||
go cs.proposalHeartbeat(height, round)
|
go cs.proposalHeartbeat(height, round)
|
||||||
} else {
|
} else {
|
||||||
@ -1013,6 +1017,7 @@ func (cs *ConsensusState) enterPrevote(height int64, round int) {
|
|||||||
|
|
||||||
func (cs *ConsensusState) defaultDoPrevote(height int64, round int) {
|
func (cs *ConsensusState) defaultDoPrevote(height int64, round int) {
|
||||||
logger := cs.Logger.With("height", height, "round", round)
|
logger := cs.Logger.With("height", height, "round", round)
|
||||||
|
|
||||||
// If a block is locked, prevote that.
|
// If a block is locked, prevote that.
|
||||||
if cs.LockedBlock != nil {
|
if cs.LockedBlock != nil {
|
||||||
logger.Info("enterPrevote: Block was locked")
|
logger.Info("enterPrevote: Block was locked")
|
||||||
@ -1171,8 +1176,12 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) {
|
|||||||
func (cs *ConsensusState) enterPrecommitWait(height int64, round int) {
|
func (cs *ConsensusState) enterPrecommitWait(height int64, round int) {
|
||||||
logger := cs.Logger.With("height", height, "round", round)
|
logger := cs.Logger.With("height", height, "round", round)
|
||||||
|
|
||||||
if cs.Height != height || round < cs.Round || (cs.Round == round && cstypes.RoundStepPrecommitWait <= cs.Step) {
|
if cs.Height != height || round < cs.Round || (cs.Round == round && cs.triggeredTimeoutPrecommit) {
|
||||||
logger.Debug(fmt.Sprintf("enterPrecommitWait(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step))
|
logger.Debug(
|
||||||
|
fmt.Sprintf(
|
||||||
|
"enterPrecommitWait(%v/%v): Invalid args. "+
|
||||||
|
"Current state is Height/Round: %v/%v/, triggeredTimeoutPrecommit:%v",
|
||||||
|
height, round, cs.Height, cs.Round, cs.triggeredTimeoutPrecommit))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if !cs.Votes.Precommits(round).HasTwoThirdsAny() {
|
if !cs.Votes.Precommits(round).HasTwoThirdsAny() {
|
||||||
@ -1182,7 +1191,7 @@ func (cs *ConsensusState) enterPrecommitWait(height int64, round int) {
|
|||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// Done enterPrecommitWait:
|
// Done enterPrecommitWait:
|
||||||
cs.updateRoundStep(round, cstypes.RoundStepPrecommitWait)
|
cs.triggeredTimeoutPrecommit = true
|
||||||
cs.newStep()
|
cs.newStep()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
@ -1495,6 +1504,9 @@ func (cs *ConsensusState) addProposalBlockPart(msg *BlockPartMessage, peerID p2p
|
|||||||
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
|
if cs.Step <= cstypes.RoundStepPropose && cs.isProposalComplete() {
|
||||||
// Move onto the next step
|
// Move onto the next step
|
||||||
cs.enterPrevote(height, cs.Round)
|
cs.enterPrevote(height, cs.Round)
|
||||||
|
if hasTwoThirds { // this is optimisation as this will be triggered when prevote is added
|
||||||
|
cs.enterPrecommit(height, cs.Round)
|
||||||
|
}
|
||||||
} else if cs.Step == cstypes.RoundStepCommit {
|
} else if cs.Step == cstypes.RoundStepCommit {
|
||||||
// If we're waiting on the proposal block...
|
// If we're waiting on the proposal block...
|
||||||
cs.tryFinalizeCommit(height)
|
cs.tryFinalizeCommit(height)
|
||||||
@ -1609,7 +1621,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
|
|||||||
// Update Valid* if we can.
|
// Update Valid* if we can.
|
||||||
// NOTE: our proposal block may be nil or not what received a polka..
|
// NOTE: our proposal block may be nil or not what received a polka..
|
||||||
// TODO: we may want to still update the ValidBlock and obtain it via gossipping
|
// TODO: we may want to still update the ValidBlock and obtain it via gossipping
|
||||||
if !blockID.IsZero() &&
|
if len(blockID.Hash) != 0 &&
|
||||||
(cs.ValidRound < vote.Round) &&
|
(cs.ValidRound < vote.Round) &&
|
||||||
(vote.Round <= cs.Round) &&
|
(vote.Round <= cs.Round) &&
|
||||||
cs.ProposalBlock.HashesTo(blockID.Hash) {
|
cs.ProposalBlock.HashesTo(blockID.Hash) {
|
||||||
@ -1621,14 +1633,14 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If +2/3 prevotes for *anything* for this or future round:
|
// If +2/3 prevotes for *anything* for future round:
|
||||||
if cs.Round <= vote.Round && prevotes.HasTwoThirdsAny() {
|
if cs.Round < vote.Round && prevotes.HasTwoThirdsAny() {
|
||||||
// Round-skip over to PrevoteWait or goto Precommit.
|
// Round-skip if there is any 2/3+ of votes ahead of us
|
||||||
cs.enterNewRound(height, vote.Round) // if the vote is ahead of us
|
cs.enterNewRound(height, vote.Round)
|
||||||
|
} else if cs.Round == vote.Round && cstypes.RoundStepPrevote <= cs.Step { // current round
|
||||||
if prevotes.HasTwoThirdsMajority() {
|
if prevotes.HasTwoThirdsMajority() {
|
||||||
cs.enterPrecommit(height, vote.Round)
|
cs.enterPrecommit(height, vote.Round)
|
||||||
} else {
|
} else if prevotes.HasTwoThirdsAny() {
|
||||||
cs.enterPrevote(height, vote.Round) // if the vote is ahead of us
|
|
||||||
cs.enterPrevoteWait(height, vote.Round)
|
cs.enterPrevoteWait(height, vote.Round)
|
||||||
}
|
}
|
||||||
} else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round {
|
} else if cs.Proposal != nil && 0 <= cs.Proposal.POLRound && cs.Proposal.POLRound == vote.Round {
|
||||||
@ -1641,21 +1653,25 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
|
|||||||
case types.VoteTypePrecommit:
|
case types.VoteTypePrecommit:
|
||||||
precommits := cs.Votes.Precommits(vote.Round)
|
precommits := cs.Votes.Precommits(vote.Round)
|
||||||
cs.Logger.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort())
|
cs.Logger.Info("Added to precommit", "vote", vote, "precommits", precommits.StringShort())
|
||||||
|
|
||||||
blockID, ok := precommits.TwoThirdsMajority()
|
blockID, ok := precommits.TwoThirdsMajority()
|
||||||
if ok && len(blockID.Hash) != 0 {
|
if ok {
|
||||||
// Executed as TwoThirdsMajority could be from a higher round
|
// Executed as TwoThirdsMajority could be from a higher round
|
||||||
cs.enterNewRound(height, vote.Round)
|
cs.enterNewRound(height, vote.Round)
|
||||||
cs.enterPrecommit(height, vote.Round)
|
cs.enterPrecommit(height, vote.Round)
|
||||||
cs.enterCommit(height, vote.Round)
|
if len(blockID.Hash) != 0 {
|
||||||
|
cs.enterCommit(height, vote.Round)
|
||||||
if cs.config.SkipTimeoutCommit && precommits.HasAll() {
|
if cs.config.SkipTimeoutCommit && precommits.HasAll() {
|
||||||
cs.enterNewRound(cs.Height, 0)
|
cs.enterNewRound(cs.Height, 0)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
cs.enterPrecommitWait(height, vote.Round)
|
||||||
}
|
}
|
||||||
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
|
} else if cs.Round <= vote.Round && precommits.HasTwoThirdsAny() {
|
||||||
cs.enterNewRound(height, vote.Round)
|
cs.enterNewRound(height, vote.Round)
|
||||||
cs.enterPrecommit(height, vote.Round)
|
|
||||||
cs.enterPrecommitWait(height, vote.Round)
|
cs.enterPrecommitWait(height, vote.Round)
|
||||||
}
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
panic(fmt.Sprintf("Unexpected vote type %X", vote.Type)) // go-wire should prevent this.
|
panic(fmt.Sprintf("Unexpected vote type %X", vote.Type)) // go-wire should prevent this.
|
||||||
}
|
}
|
||||||
|
File diff suppressed because it is too large
Load Diff
@ -106,7 +106,7 @@ func DefaultMetricsProvider(config *cfg.InstrumentationConfig) MetricsProvider {
|
|||||||
return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
|
return func() (*cs.Metrics, *p2p.Metrics, *mempl.Metrics, *sm.Metrics) {
|
||||||
if config.Prometheus {
|
if config.Prometheus {
|
||||||
return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace),
|
return cs.PrometheusMetrics(config.Namespace), p2p.PrometheusMetrics(config.Namespace),
|
||||||
mempl.PrometheusMetrics(config.Namespace), sm.PrometheusMetrics(config.Namespace)
|
mempl.PrometheusMetrics(config.Namespace), sm.PrometheusMetrics(config.Namespace)
|
||||||
}
|
}
|
||||||
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
|
return cs.NopMetrics(), p2p.NopMetrics(), mempl.NopMetrics(), sm.NopMetrics()
|
||||||
}
|
}
|
||||||
|
@ -56,7 +56,6 @@ func PrometheusMetrics(namespace string) *Metrics {
|
|||||||
Name: "num_txs",
|
Name: "num_txs",
|
||||||
Help: "Number of transactions submitted by each peer.",
|
Help: "Number of transactions submitted by each peer.",
|
||||||
}, []string{"peer_id"}),
|
}, []string{"peer_id"}),
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +49,7 @@ func BlockExecutorWithMetrics(metrics *Metrics) BlockExecutorOption {
|
|||||||
// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
|
// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
|
||||||
// Call SetEventBus to provide one.
|
// Call SetEventBus to provide one.
|
||||||
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
|
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
|
||||||
mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
|
mempool Mempool, evpool EvidencePool, options ...BlockExecutorOption) *BlockExecutor {
|
||||||
res := &BlockExecutor{
|
res := &BlockExecutor{
|
||||||
db: db,
|
db: db,
|
||||||
proxyApp: proxyApp,
|
proxyApp: proxyApp,
|
||||||
@ -95,7 +95,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
|
|||||||
startTime := time.Now().UnixNano()
|
startTime := time.Now().UnixNano()
|
||||||
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
|
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block, state.LastValidators, blockExec.db)
|
||||||
endTime := time.Now().UnixNano()
|
endTime := time.Now().UnixNano()
|
||||||
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime - startTime) / 1000000)
|
blockExec.metrics.BlockProcessingTime.Observe(float64(endTime-startTime) / 1000000)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return state, ErrProxyAppConn(err)
|
return state, ErrProxyAppConn(err)
|
||||||
}
|
}
|
||||||
@ -198,11 +198,11 @@ func (blockExec *BlockExecutor) Commit(
|
|||||||
// Executes block's transactions on proxyAppConn.
|
// Executes block's transactions on proxyAppConn.
|
||||||
// Returns a list of transaction results and updates to the validator set
|
// Returns a list of transaction results and updates to the validator set
|
||||||
func execBlockOnProxyApp(
|
func execBlockOnProxyApp(
|
||||||
logger log.Logger,
|
logger log.Logger,
|
||||||
proxyAppConn proxy.AppConnConsensus,
|
proxyAppConn proxy.AppConnConsensus,
|
||||||
block *types.Block,
|
block *types.Block,
|
||||||
lastValSet *types.ValidatorSet,
|
lastValSet *types.ValidatorSet,
|
||||||
stateDB dbm.DB,
|
stateDB dbm.DB,
|
||||||
) (*ABCIResponses, error) {
|
) (*ABCIResponses, error) {
|
||||||
var validTxs, invalidTxs = 0, 0
|
var validTxs, invalidTxs = 0, 0
|
||||||
|
|
||||||
@ -360,10 +360,10 @@ func updateValidators(currentSet *types.ValidatorSet, abciUpdates []abci.Validat
|
|||||||
|
|
||||||
// updateState returns a new State updated according to the header and responses.
|
// updateState returns a new State updated according to the header and responses.
|
||||||
func updateState(
|
func updateState(
|
||||||
state State,
|
state State,
|
||||||
blockID types.BlockID,
|
blockID types.BlockID,
|
||||||
header *types.Header,
|
header *types.Header,
|
||||||
abciResponses *ABCIResponses,
|
abciResponses *ABCIResponses,
|
||||||
) (State, error) {
|
) (State, error) {
|
||||||
|
|
||||||
// Copy the valset so we can apply changes from EndBlock
|
// Copy the valset so we can apply changes from EndBlock
|
||||||
@ -448,11 +448,11 @@ func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *ty
|
|||||||
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
|
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
|
||||||
// It returns the application root hash (result of abci.Commit).
|
// It returns the application root hash (result of abci.Commit).
|
||||||
func ExecCommitBlock(
|
func ExecCommitBlock(
|
||||||
appConnConsensus proxy.AppConnConsensus,
|
appConnConsensus proxy.AppConnConsensus,
|
||||||
block *types.Block,
|
block *types.Block,
|
||||||
logger log.Logger,
|
logger log.Logger,
|
||||||
lastValSet *types.ValidatorSet,
|
lastValSet *types.ValidatorSet,
|
||||||
stateDB dbm.DB,
|
stateDB dbm.DB,
|
||||||
) ([]byte, error) {
|
) ([]byte, error) {
|
||||||
_, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
|
_, err := execBlockOnProxyApp(logger, appConnConsensus, block, lastValSet, stateDB)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -2,9 +2,9 @@ package state
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/go-kit/kit/metrics"
|
"github.com/go-kit/kit/metrics"
|
||||||
|
"github.com/go-kit/kit/metrics/discard"
|
||||||
"github.com/go-kit/kit/metrics/prometheus"
|
"github.com/go-kit/kit/metrics/prometheus"
|
||||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||||
"github.com/go-kit/kit/metrics/discard"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const MetricsSubsystem = "state"
|
const MetricsSubsystem = "state"
|
||||||
|
Loading…
x
Reference in New Issue
Block a user