mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-25 06:42:16 +00:00
switch to events package
This commit is contained in:
parent
b77d5344fc
commit
c4fef499b6
3
Gopkg.lock
generated
3
Gopkg.lock
generated
@ -278,6 +278,7 @@
|
||||
"clist",
|
||||
"common",
|
||||
"db",
|
||||
"events",
|
||||
"flowrate",
|
||||
"log",
|
||||
"merkle",
|
||||
@ -384,6 +385,6 @@
|
||||
[solve-meta]
|
||||
analyzer-name = "dep"
|
||||
analyzer-version = 1
|
||||
inputs-digest = "52a0dcbebdf8714612444914cfce59a3af8c47c4453a2d43c4ccc5ff1a91d8ea"
|
||||
inputs-digest = "a88c20b6e36b3529d6fdcffc3603d9eb193fc3809de8afbba07bad990539b256"
|
||||
solver-name = "gps-cdcl"
|
||||
solver-version = 1
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
|
||||
amino "github.com/tendermint/go-amino"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
tmevents "github.com/tendermint/tmlibs/events"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
cstypes "github.com/tendermint/tendermint/consensus/types"
|
||||
@ -48,12 +49,6 @@ func NewConsensusReactor(consensusState *ConsensusState, fastSync bool) *Consens
|
||||
conS: consensusState,
|
||||
fastSync: fastSync,
|
||||
}
|
||||
// XXX: modifing state to send us new round steps, votes and proposal heartbeats
|
||||
consensusState.reactorChs = &reactorChs{
|
||||
newRoundSteps: make(chan *cstypes.RoundState),
|
||||
votes: make(chan *types.Vote),
|
||||
proposalHeartbeats: make(chan *types.Heartbeat),
|
||||
}
|
||||
conR.BaseReactor = *p2p.NewBaseReactor("ConsensusReactor", conR)
|
||||
return conR
|
||||
}
|
||||
@ -65,16 +60,12 @@ func (conR *ConsensusReactor) OnStart() error {
|
||||
return err
|
||||
}
|
||||
|
||||
err := conR.startBroadcastRoutine()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !conR.FastSync() {
|
||||
err := conR.conS.Start()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
conR.subscribeToBroadcastEvents()
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -106,7 +97,9 @@ func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int
|
||||
err := conR.conS.Start()
|
||||
if err != nil {
|
||||
conR.Logger.Error("Error starting conS", "err", err)
|
||||
return
|
||||
}
|
||||
conR.subscribeToBroadcastEvents()
|
||||
}
|
||||
|
||||
// GetChannels implements Reactor
|
||||
@ -350,28 +343,30 @@ func (conR *ConsensusReactor) FastSync() bool {
|
||||
|
||||
//--------------------------------------
|
||||
|
||||
// startBroadcastRoutine subscribes for new round steps, votes and
|
||||
// proposal heartbeats using the channels created for precisely this
|
||||
// purpose in consensus state and starts a goroutine to broadcasts
|
||||
// events to peers upon receiving them.
|
||||
func (conR *ConsensusReactor) startBroadcastRoutine() error {
|
||||
go func() {
|
||||
rchs := conR.conS.reactorChs.(*reactorChs)
|
||||
for {
|
||||
select {
|
||||
case rs := <-rchs.newRoundSteps:
|
||||
conR.broadcastNewRoundStepMessages(rs)
|
||||
case vote := <-rchs.votes:
|
||||
conR.broadcastHasVoteMessage(vote)
|
||||
case heartbeat := <-rchs.proposalHeartbeats:
|
||||
conR.broadcastProposalHeartbeatMessage(heartbeat)
|
||||
case <-conR.Quit():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
// subscribeToBroadcastEvents subscribes for new round steps, votes and
|
||||
// proposal heartbeats using internal pubsub defined on state to broadcast
|
||||
// them to peers upon receiving.
|
||||
func (conR *ConsensusReactor) subscribeToBroadcastEvents() {
|
||||
// assert consensus state is running
|
||||
if !conR.conS.IsRunning() {
|
||||
panic("consensus state must be running at this point")
|
||||
}
|
||||
|
||||
return nil
|
||||
const subscriber = "consensus-reactor"
|
||||
conR.conS.evsw.AddListenerForEvent(subscriber, types.EventNewRoundStep,
|
||||
func(data tmevents.EventData) {
|
||||
conR.broadcastNewRoundStepMessages(data.(*cstypes.RoundState))
|
||||
})
|
||||
|
||||
conR.conS.evsw.AddListenerForEvent(subscriber, types.EventVote,
|
||||
func(data tmevents.EventData) {
|
||||
conR.broadcastHasVoteMessage(data.(*types.Vote))
|
||||
})
|
||||
|
||||
conR.conS.evsw.AddListenerForEvent(subscriber, types.EventProposalHeartbeat,
|
||||
func(data tmevents.EventData) {
|
||||
conR.broadcastProposalHeartbeatMessage(data.(*types.Heartbeat))
|
||||
})
|
||||
}
|
||||
|
||||
func (conR *ConsensusReactor) broadcastProposalHeartbeatMessage(hb *types.Heartbeat) {
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
|
||||
fail "github.com/ebuchman/fail-test"
|
||||
cmn "github.com/tendermint/tmlibs/common"
|
||||
tmevents "github.com/tendermint/tmlibs/events"
|
||||
"github.com/tendermint/tmlibs/log"
|
||||
|
||||
cfg "github.com/tendermint/tendermint/config"
|
||||
@ -111,39 +112,11 @@ type ConsensusState struct {
|
||||
// closed when we finish shutting down
|
||||
done chan struct{}
|
||||
|
||||
// synchronous pubsub between consensus state and reactor
|
||||
// only set when there is a reactor
|
||||
reactorChs reactorChsI
|
||||
// synchronous pubsub between consensus state and reactor.
|
||||
// state only emits EventNewRoundStep, EventVote and EventProposalHeartbeat
|
||||
evsw tmevents.EventSwitch
|
||||
}
|
||||
|
||||
type reactorChsI interface {
|
||||
NewRoundStep(*cstypes.RoundState)
|
||||
Vote(*types.Vote)
|
||||
ProposalHeartbeat(*types.Heartbeat)
|
||||
}
|
||||
|
||||
// A list of channels to send new round steps, votes and proposal heartbeats to.
|
||||
type reactorChs struct {
|
||||
newRoundSteps chan (*cstypes.RoundState)
|
||||
votes chan (*types.Vote)
|
||||
proposalHeartbeats chan (*types.Heartbeat)
|
||||
}
|
||||
|
||||
var _ reactorChsI = (*reactorChs)(nil)
|
||||
|
||||
// BLOCKING
|
||||
func (rchs *reactorChs) NewRoundStep(rs *cstypes.RoundState) { rchs.newRoundSteps <- rs }
|
||||
func (rchs *reactorChs) Vote(vote *types.Vote) { rchs.votes <- vote }
|
||||
func (rchs *reactorChs) ProposalHeartbeat(hb *types.Heartbeat) { rchs.proposalHeartbeats <- hb }
|
||||
|
||||
type nilReactorChs struct{}
|
||||
|
||||
var _ reactorChsI = nilReactorChs{}
|
||||
|
||||
func (nilReactorChs) NewRoundStep(rs *cstypes.RoundState) {}
|
||||
func (nilReactorChs) Vote(vote *types.Vote) {}
|
||||
func (nilReactorChs) ProposalHeartbeat(hb *types.Heartbeat) {}
|
||||
|
||||
// NewConsensusState returns a new ConsensusState.
|
||||
func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState {
|
||||
cs := &ConsensusState{
|
||||
@ -158,7 +131,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *s
|
||||
doWALCatchup: true,
|
||||
wal: nilWAL{},
|
||||
evpool: evpool,
|
||||
reactorChs: nilReactorChs{},
|
||||
evsw: tmevents.NewEventSwitch(),
|
||||
}
|
||||
// set function defaults (may be overwritten before calling Start)
|
||||
cs.decideProposal = cs.defaultDecideProposal
|
||||
@ -260,6 +233,10 @@ func (cs *ConsensusState) LoadCommit(height int64) *types.Commit {
|
||||
// OnStart implements cmn.Service.
|
||||
// It loads the latest state via the WAL, and starts the timeout and receive routines.
|
||||
func (cs *ConsensusState) OnStart() error {
|
||||
if err := cs.evsw.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// we may set the WAL in testing before calling Start,
|
||||
// so only OpenWAL if its still the nilWAL
|
||||
if _, ok := cs.wal.(nilWAL); ok {
|
||||
@ -277,8 +254,7 @@ func (cs *ConsensusState) OnStart() error {
|
||||
// NOTE: we will get a build up of garbage go routines
|
||||
// firing on the tockChan until the receiveRoutine is started
|
||||
// to deal with them (by that point, at most one will be valid)
|
||||
err := cs.timeoutTicker.Start()
|
||||
if err != nil {
|
||||
if err := cs.timeoutTicker.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -317,6 +293,8 @@ func (cs *ConsensusState) startRoutines(maxSteps int) {
|
||||
func (cs *ConsensusState) OnStop() {
|
||||
cs.BaseService.OnStop()
|
||||
|
||||
cs.evsw.Stop()
|
||||
|
||||
cs.timeoutTicker.Stop()
|
||||
|
||||
// Make BaseService.Wait() wait until cs.wal.Wait()
|
||||
@ -542,7 +520,7 @@ func (cs *ConsensusState) newStep() {
|
||||
// newStep is called by updateToStep in NewConsensusState before the eventBus is set!
|
||||
if cs.eventBus != nil {
|
||||
cs.eventBus.PublishEventNewRoundStep(rs)
|
||||
cs.reactorChs.NewRoundStep(&cs.RoundState)
|
||||
cs.evsw.FireEvent(types.EventNewRoundStep, &cs.RoundState)
|
||||
}
|
||||
}
|
||||
|
||||
@ -786,7 +764,7 @@ func (cs *ConsensusState) proposalHeartbeat(height int64, round int) {
|
||||
}
|
||||
cs.privValidator.SignHeartbeat(chainID, heartbeat)
|
||||
cs.eventBus.PublishEventProposalHeartbeat(types.EventDataProposalHeartbeat{heartbeat})
|
||||
cs.reactorChs.ProposalHeartbeat(heartbeat)
|
||||
cs.evsw.FireEvent(types.EventProposalHeartbeat, heartbeat)
|
||||
counter++
|
||||
time.Sleep(proposalHeartbeatIntervalSeconds * time.Second)
|
||||
}
|
||||
@ -1453,7 +1431,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
|
||||
|
||||
cs.Logger.Info(cmn.Fmt("Added to lastPrecommits: %v", cs.LastCommit.StringShort()))
|
||||
cs.eventBus.PublishEventVote(types.EventDataVote{vote})
|
||||
cs.reactorChs.Vote(vote)
|
||||
cs.evsw.FireEvent(types.EventVote, vote)
|
||||
|
||||
// if we can skip timeoutCommit and have all the votes now,
|
||||
if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() {
|
||||
@ -1481,7 +1459,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
|
||||
}
|
||||
|
||||
cs.eventBus.PublishEventVote(types.EventDataVote{vote})
|
||||
cs.reactorChs.Vote(vote)
|
||||
cs.evsw.FireEvent(types.EventVote, vote)
|
||||
|
||||
switch vote.Type {
|
||||
case types.VoteTypePrevote:
|
||||
|
Loading…
x
Reference in New Issue
Block a user