diff --git a/consensus/common_test.go b/consensus/common_test.go index 4ddd6b8a..3eaeea70 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -264,7 +264,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S // mock the evidence pool evpool := types.MockEvidencePool{} - // Make ConsensusReactor + // Make ConsensusState stateDB := dbm.NewMemDB() blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool) cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) diff --git a/consensus/reactor.go b/consensus/reactor.go index 9535108c..1193ed72 100644 --- a/consensus/reactor.go +++ b/consensus/reactor.go @@ -1,7 +1,6 @@ package consensus import ( - "context" "fmt" "reflect" "sync" @@ -49,6 +48,12 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens conS: consensusState, fastSync: fastSync, } + // XXX: modifing state to send us new round steps, votes and proposal heartbeats + consensusState.reactorChs = &reactorChs{ + newRoundSteps: make(chan *cstypes.RoundState), + votes: make(chan *types.Vote), + proposalHeartbeats: make(chan *types.Heartbeat), + } conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR) return conR } @@ -345,60 +350,22 @@ func (conR *ConsensusReactor) FastSync() bool { //-------------------------------------- -// startBroadcastRoutine subscribes for new round steps, votes and proposal -// heartbeats using the event bus and starts a go routine to broadcasts events -// to peers upon receiving them. +// startBroadcastRoutine subscribes for new round steps, votes and +// proposal heartbeats using the channels created for precisely this +// purpose in consensus state and starts a goroutine to broadcasts +// events to peers upon receiving them. func (conR *ConsensusReactor) startBroadcastRoutine() error { - const subscriber = "consensus-reactor" - ctx := context.Background() - - // new round steps - stepsCh := make(chan interface{}) - err := conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, stepsCh) - if err != nil { - return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryNewRoundStep) - } - - // votes - votesCh := make(chan interface{}) - err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryVote, votesCh) - if err != nil { - return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryVote) - } - - // proposal heartbeats - heartbeatsCh := make(chan interface{}) - err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryProposalHeartbeat, heartbeatsCh) - if err != nil { - return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryProposalHeartbeat) - } - go func() { - var data interface{} - var ok bool + rchs := conR.conS.reactorChs.(*reactorChs) for { select { - case data, ok = <-stepsCh: - if ok { // a receive from a closed channel returns the zero value immediately - edrs := data.(types.EventDataRoundState) - conR.broadcastNewRoundStep(edrs.RoundState.(*cstypes.RoundState)) - } - case data, ok = <-votesCh: - if ok { - edv := data.(types.EventDataVote) - conR.broadcastHasVoteMessage(edv.Vote) - } - case data, ok = <-heartbeatsCh: - if ok { - edph := data.(types.EventDataProposalHeartbeat) - conR.broadcastProposalHeartbeatMessage(edph) - } + case rs := <-rchs.newRoundSteps: + conR.broadcastNewRoundStepMessage(rs) + case vote := <-rchs.votes: + conR.broadcastHasVoteMessage(vote) + case heartbeat := <-rchs.proposalHeartbeats: + conR.broadcastProposalHeartbeat(heartbeat) case <-conR.Quit(): - conR.eventBus.UnsubscribeAll(ctx, subscriber) - return - } - if !ok { - conR.eventBus.UnsubscribeAll(ctx, subscriber) return } } @@ -407,15 +374,14 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error { return nil } -func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) { - hb := heartbeat.Heartbeat +func (conR *ConsensusReactor) broadcastProposalHeartbeat(hb *types.Heartbeat) { conR.Logger.Debug("Broadcasting proposal heartbeat message", "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence) msg := &ProposalHeartbeatMessage{hb} conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(msg)) } -func (conR *ConsensusReactor) broadcastNewRoundStep(rs *cstypes.RoundState) { +func (conR *ConsensusReactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) { nrsMsg, csMsg := makeRoundStepMessages(rs) if nrsMsg != nil { conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg)) diff --git a/consensus/state.go b/consensus/state.go index b5b94368..5668ea82 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -110,8 +110,40 @@ type ConsensusState struct { // closed when we finish shutting down done chan struct{} + + // synchronous pubsub between consensus state and reactor + // only set when there is a reactor + reactorChs reactorChsI } +type reactorChsI interface { + NewRoundStep(*cstypes.RoundState) + Vote(*types.Vote) + ProposalHeartbeat(*types.Heartbeat) +} + +// A list of channels to send new round steps, votes and proposal heartbeats to. +type reactorChs struct { + newRoundSteps chan (*cstypes.RoundState) + votes chan (*types.Vote) + proposalHeartbeats chan (*types.Heartbeat) +} + +var _ reactorChsI = (*reactorChs)(nil) + +// BLOCKING +func (rchs *reactorChs) NewRoundStep(rs *cstypes.RoundState) { rchs.newRoundSteps <- rs } +func (rchs *reactorChs) Vote(vote *types.Vote) { rchs.votes <- vote } +func (rchs *reactorChs) ProposalHeartbeat(hb *types.Heartbeat) { rchs.proposalHeartbeats <- hb } + +type nilReactorChs struct{} + +var _ reactorChsI = nilReactorChs{} + +func (nilReactorChs) NewRoundStep(rs *cstypes.RoundState) {} +func (nilReactorChs) Vote(vote *types.Vote) {} +func (nilReactorChs) ProposalHeartbeat(hb *types.Heartbeat) {} + // NewConsensusState returns a new ConsensusState. func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { cs := &ConsensusState{ @@ -126,6 +158,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *s doWALCatchup: true, wal: nilWAL{}, evpool: evpool, + reactorChs: nilReactorChs{}, } // set function defaults (may be overwritten before calling Start) cs.decideProposal = cs.defaultDecideProposal @@ -509,6 +542,7 @@ func (cs *ConsensusState) newStep() { // newStep is called by updateToStep in NewConsensusState before the eventBus is set! if cs.eventBus != nil { cs.eventBus.PublishEventNewRoundStep(rs) + cs.reactorChs.NewRoundStep(&cs.RoundState) } } @@ -752,6 +786,7 @@ func (cs *ConsensusState) proposalHeartbeat(height int64, round int) { } cs.privValidator.SignHeartbeat(chainID, heartbeat) cs.eventBus.PublishEventProposalHeartbeat(types.EventDataProposalHeartbeat{heartbeat}) + cs.reactorChs.ProposalHeartbeat(heartbeat) counter++ time.Sleep(proposalHeartbeatIntervalSeconds * time.Second) } @@ -1418,6 +1453,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, cs.Logger.Info(cmn.Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) cs.eventBus.PublishEventVote(types.EventDataVote{vote}) + cs.reactorChs.Vote(vote) // if we can skip timeoutCommit and have all the votes now, if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() { @@ -1445,6 +1481,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool, } cs.eventBus.PublishEventVote(types.EventDataVote{vote}) + cs.reactorChs.Vote(vote) switch vote.Type { case types.VoteTypePrevote: