mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-26 15:22:15 +00:00
consensus: fixes #1754
* updateToState exits early if the state isn't new, which happens after * fast syncing. This results in not sending a NewRoundStep message. The mempool * reactor depends on PeerState, which is updated by NewRoundStep * messages. If the peer never sends a NewRoundStep, the mempool reactor * will think they're behind, and never forward transactions. Note this * only happens when `create_empty_blocks = false`, because otherwise * peers will move through the consensus state and send a NewRoundStep * for a new step soon anyways. Simple fix is just to send the * NewRoundStep message during updateToState even if exit early
This commit is contained in:
parent
c84be3b8dd
commit
a519825bf8
@ -460,9 +460,12 @@ func (cs *ConsensusState) updateToState(state sm.State) {
|
|||||||
|
|
||||||
// If state isn't further out than cs.state, just ignore.
|
// If state isn't further out than cs.state, just ignore.
|
||||||
// This happens when SwitchToConsensus() is called in the reactor.
|
// This happens when SwitchToConsensus() is called in the reactor.
|
||||||
// We don't want to reset e.g. the Votes.
|
// We don't want to reset e.g. the Votes, but we still want to
|
||||||
|
// signal the new round step, because other services (eg. mempool)
|
||||||
|
// depend on having an up-to-date peer state!
|
||||||
if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) {
|
if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) {
|
||||||
cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1)
|
cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1)
|
||||||
|
cs.newStep()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -492,6 +495,7 @@ func (cs *ConsensusState) updateToState(state sm.State) {
|
|||||||
} else {
|
} else {
|
||||||
cs.StartTime = cs.config.Commit(cs.CommitTime)
|
cs.StartTime = cs.config.Commit(cs.CommitTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
cs.Validators = validators
|
cs.Validators = validators
|
||||||
cs.Proposal = nil
|
cs.Proposal = nil
|
||||||
cs.ProposalBlock = nil
|
cs.ProposalBlock = nil
|
||||||
@ -517,7 +521,7 @@ func (cs *ConsensusState) newStep() {
|
|||||||
rs := cs.RoundStateEvent()
|
rs := cs.RoundStateEvent()
|
||||||
cs.wal.Write(rs)
|
cs.wal.Write(rs)
|
||||||
cs.nSteps++
|
cs.nSteps++
|
||||||
// newStep is called by updateToStep in NewConsensusState before the eventBus is set!
|
// newStep is called by updateToState in NewConsensusState before the eventBus is set!
|
||||||
if cs.eventBus != nil {
|
if cs.eventBus != nil {
|
||||||
cs.eventBus.PublishEventNewRoundStep(rs)
|
cs.eventBus.PublishEventNewRoundStep(rs)
|
||||||
cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState)
|
cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState)
|
||||||
|
@ -328,11 +328,11 @@ func (mem *Mempool) notifyTxsAvailable() {
|
|||||||
panic("notified txs available but mempool is empty!")
|
panic("notified txs available but mempool is empty!")
|
||||||
}
|
}
|
||||||
if mem.txsAvailable != nil && !mem.notifiedTxsAvailable {
|
if mem.txsAvailable != nil && !mem.notifiedTxsAvailable {
|
||||||
|
// channel cap is 1, so this will send once
|
||||||
select {
|
select {
|
||||||
case mem.txsAvailable <- mem.height + 1:
|
case mem.txsAvailable <- mem.height + 1:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
mem.notifiedTxsAvailable = true
|
mem.notifiedTxsAvailable = true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -103,6 +103,7 @@ type PeerState interface {
|
|||||||
// Send new mempool txs to peer.
|
// Send new mempool txs to peer.
|
||||||
func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
|
func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
|
||||||
if !memR.config.Broadcast {
|
if !memR.config.Broadcast {
|
||||||
|
memR.Logger.Info("Tx broadcasting is disabled")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -129,7 +130,8 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer p2p.Peer) {
|
|||||||
height := memTx.Height()
|
height := memTx.Height()
|
||||||
if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
|
if peerState_i := peer.Get(types.PeerStateKey); peerState_i != nil {
|
||||||
peerState := peerState_i.(PeerState)
|
peerState := peerState_i.(PeerState)
|
||||||
if peerState.GetHeight() < height-1 { // Allow for a lag of 1 block
|
peerHeight := peerState.GetHeight()
|
||||||
|
if peerHeight < height-1 { // Allow for a lag of 1 block
|
||||||
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
|
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -3,5 +3,4 @@ package types
|
|||||||
// UNSTABLE
|
// UNSTABLE
|
||||||
var (
|
var (
|
||||||
PeerStateKey = "ConsensusReactor.peerState"
|
PeerStateKey = "ConsensusReactor.peerState"
|
||||||
PeerMempoolChKey = "MempoolReactor.peerMempoolCh"
|
|
||||||
)
|
)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user