use channels to send votes, ... from consensus state to reactor

Refs #847
This commit is contained in:
Anton Kaliaev
2018-05-15 14:32:06 +04:00
parent bf6527fc59
commit 21f5f3faa7
3 changed files with 57 additions and 54 deletions

View File

@ -264,7 +264,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
// mock the evidence pool // mock the evidence pool
evpool := types.MockEvidencePool{} evpool := types.MockEvidencePool{}
// Make ConsensusReactor // Make ConsensusState
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool) blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool) cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)

View File

@ -1,7 +1,6 @@
package consensus package consensus
import ( import (
"context"
"fmt" "fmt"
"reflect" "reflect"
"sync" "sync"
@ -49,6 +48,12 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens
conS: consensusState, conS: consensusState,
fastSync: fastSync, fastSync: fastSync,
} }
// XXX: modifing state to send us new round steps, votes and proposal heartbeats
consensusState.reactorChs = &reactorChs{
newRoundSteps: make(chan *cstypes.RoundState),
votes: make(chan *types.Vote),
proposalHeartbeats: make(chan *types.Heartbeat),
}
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR) conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
return conR return conR
} }
@ -345,60 +350,22 @@ func (conR *ConsensusReactor) FastSync() bool {
//-------------------------------------- //--------------------------------------
// startBroadcastRoutine subscribes for new round steps, votes and proposal // startBroadcastRoutine subscribes for new round steps, votes and
// heartbeats using the event bus and starts a go routine to broadcasts events // proposal heartbeats using the channels created for precisely this
// to peers upon receiving them. // purpose in consensus state and starts a goroutine to broadcasts
// events to peers upon receiving them.
func (conR *ConsensusReactor) startBroadcastRoutine() error { func (conR *ConsensusReactor) startBroadcastRoutine() error {
const subscriber = "consensus-reactor"
ctx := context.Background()
// new round steps
stepsCh := make(chan interface{})
err := conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryNewRoundStep, stepsCh)
if err != nil {
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryNewRoundStep)
}
// votes
votesCh := make(chan interface{})
err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryVote, votesCh)
if err != nil {
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryVote)
}
// proposal heartbeats
heartbeatsCh := make(chan interface{})
err = conR.eventBus.Subscribe(ctx, subscriber, types.EventQueryProposalHeartbeat, heartbeatsCh)
if err != nil {
return errors.Wrapf(err, "failed to subscribe %s to %s", subscriber, types.EventQueryProposalHeartbeat)
}
go func() { go func() {
var data interface{} rchs := conR.conS.reactorChs.(*reactorChs)
var ok bool
for { for {
select { select {
case data, ok = <-stepsCh: case rs := <-rchs.newRoundSteps:
if ok { // a receive from a closed channel returns the zero value immediately conR.broadcastNewRoundStepMessage(rs)
edrs := data.(types.EventDataRoundState) case vote := <-rchs.votes:
conR.broadcastNewRoundStep(edrs.RoundState.(*cstypes.RoundState)) conR.broadcastHasVoteMessage(vote)
} case heartbeat := <-rchs.proposalHeartbeats:
case data, ok = <-votesCh: conR.broadcastProposalHeartbeat(heartbeat)
if ok {
edv := data.(types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote)
}
case data, ok = <-heartbeatsCh:
if ok {
edph := data.(types.EventDataProposalHeartbeat)
conR.broadcastProposalHeartbeatMessage(edph)
}
case <-conR.Quit(): case <-conR.Quit():
conR.eventBus.UnsubscribeAll(ctx, subscriber)
return
}
if !ok {
conR.eventBus.UnsubscribeAll(ctx, subscriber)
return return
} }
} }
@ -407,15 +374,14 @@ func (conR *ConsensusReactor) startBroadcastRoutine() error {
return nil return nil
} }
func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(heartbeat types.EventDataProposalHeartbeat) { func (conR *ConsensusReactor) broadcastProposalHeartbeat(hb *types.Heartbeat) {
hb := heartbeat.Heartbeat
conR.Logger.Debug("Broadcasting proposal heartbeat message", conR.Logger.Debug("Broadcasting proposal heartbeat message",
"height", hb.Height, "round", hb.Round, "sequence", hb.Sequence) "height", hb.Height, "round", hb.Round, "sequence", hb.Sequence)
msg := &ProposalHeartbeatMessage{hb} msg := &ProposalHeartbeatMessage{hb}
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(msg)) conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(msg))
} }
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *cstypes.RoundState) { func (conR *ConsensusReactor) broadcastNewRoundStepMessage(rs *cstypes.RoundState) {
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg)) conR.Switch.Broadcast(StateChannel, cdc.MustMarshalBinaryBare(nrsMsg))

View File

@ -110,8 +110,40 @@ type ConsensusState struct {
// closed when we finish shutting down // closed when we finish shutting down
done chan struct{} done chan struct{}
// synchronous pubsub between consensus state and reactor
// only set when there is a reactor
reactorChs reactorChsI
} }
type reactorChsI interface {
NewRoundStep(*cstypes.RoundState)
Vote(*types.Vote)
ProposalHeartbeat(*types.Heartbeat)
}
// A list of channels to send new round steps, votes and proposal heartbeats to.
type reactorChs struct {
newRoundSteps chan (*cstypes.RoundState)
votes chan (*types.Vote)
proposalHeartbeats chan (*types.Heartbeat)
}
var _ reactorChsI = (*reactorChs)(nil)
// BLOCKING
func (rchs *reactorChs) NewRoundStep(rs *cstypes.RoundState) { rchs.newRoundSteps <- rs }
func (rchs *reactorChs) Vote(vote *types.Vote) { rchs.votes <- vote }
func (rchs *reactorChs) ProposalHeartbeat(hb *types.Heartbeat) { rchs.proposalHeartbeats <- hb }
type nilReactorChs struct{}
var _ reactorChsI = nilReactorChs{}
func (nilReactorChs) NewRoundStep(rs *cstypes.RoundState) {}
func (nilReactorChs) Vote(vote *types.Vote) {}
func (nilReactorChs) ProposalHeartbeat(hb *types.Heartbeat) {}
// NewConsensusState returns a new ConsensusState. // NewConsensusState returns a new ConsensusState.
func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState {
cs := &ConsensusState{ cs := &ConsensusState{
@ -126,6 +158,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *s
doWALCatchup: true, doWALCatchup: true,
wal: nilWAL{}, wal: nilWAL{},
evpool: evpool, evpool: evpool,
reactorChs: nilReactorChs{},
} }
// set function defaults (may be overwritten before calling Start) // set function defaults (may be overwritten before calling Start)
cs.decideProposal = cs.defaultDecideProposal cs.decideProposal = cs.defaultDecideProposal
@ -509,6 +542,7 @@ func (cs *ConsensusState) newStep() {
// newStep is called by updateToStep in NewConsensusState before the eventBus is set! // newStep is called by updateToStep in NewConsensusState before the eventBus is set!
if cs.eventBus != nil { if cs.eventBus != nil {
cs.eventBus.PublishEventNewRoundStep(rs) cs.eventBus.PublishEventNewRoundStep(rs)
cs.reactorChs.NewRoundStep(&cs.RoundState)
} }
} }
@ -752,6 +786,7 @@ func (cs *ConsensusState) proposalHeartbeat(height int64, round int) {
} }
cs.privValidator.SignHeartbeat(chainID, heartbeat) cs.privValidator.SignHeartbeat(chainID, heartbeat)
cs.eventBus.PublishEventProposalHeartbeat(types.EventDataProposalHeartbeat{heartbeat}) cs.eventBus.PublishEventProposalHeartbeat(types.EventDataProposalHeartbeat{heartbeat})
cs.reactorChs.ProposalHeartbeat(heartbeat)
counter++ counter++
time.Sleep(proposalHeartbeatIntervalSeconds * time.Second) time.Sleep(proposalHeartbeatIntervalSeconds * time.Second)
} }
@ -1418,6 +1453,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
cs.Logger.Info(cmn.Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort())) cs.Logger.Info(cmn.Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
cs.eventBus.PublishEventVote(types.EventDataVote{vote}) cs.eventBus.PublishEventVote(types.EventDataVote{vote})
cs.reactorChs.Vote(vote)
// if we can skip timeoutCommit and have all the votes now, // if we can skip timeoutCommit and have all the votes now,
if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() { if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() {
@ -1445,6 +1481,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
} }
cs.eventBus.PublishEventVote(types.EventDataVote{vote}) cs.eventBus.PublishEventVote(types.EventDataVote{vote})
cs.reactorChs.Vote(vote)
switch vote.Type { switch vote.Type {
case types.VoteTypePrevote: case types.VoteTypePrevote: