Merge remote-tracking branch 'origin/consensus_refactor' into consensus_refactor_jae

This commit is contained in:
Jae Kwon
2015-12-14 09:33:11 -08:00
12 changed files with 650 additions and 754 deletions

View File

@ -34,7 +34,7 @@ type ConsensusReactor struct {
blockStore *bc.BlockStore
conS *ConsensusState
fastSync bool
evsw events.Fireable
evsw *events.EventSwitch
}
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
@ -50,13 +50,17 @@ func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockSto
func (conR *ConsensusReactor) OnStart() error {
log.Notice("ConsensusReactor ", "fastSync", conR.fastSync)
conR.BaseReactor.OnStart()
// callbacks for broadcasting new steps and votes to peers
// upon their respective events (ie. uses evsw)
conR.registerEventCallbacks()
if !conR.fastSync {
_, err := conR.conS.Start()
if err != nil {
return err
}
}
go conR.broadcastNewRoundStepRoutine()
return nil
}
@ -134,7 +138,7 @@ func (conR *ConsensusReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
// NOTE: We process these messages even when we're fast_syncing.
// Messages affect either a peer state or the consensus state.
// Peer state updates can happen in parallel, but processing of
// proposals, block parts, and votes are ordered.
// proposals, block parts, and votes are ordered by the receiveRoutine
func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte) {
if !conR.IsRunning() {
log.Debug("Receive", "channel", chID, "peer", peer, "bytes", msgBytes)
@ -213,6 +217,45 @@ func (conR *ConsensusReactor) Receive(chID byte, peer *p2p.Peer, msgBytes []byte
}
}
// Sets our private validator account for signing votes.
func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
conR.conS.SetPrivValidator(priv)
}
// implements events.Eventable
func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
conR.evsw = evsw
conR.conS.SetEventSwitch(evsw)
}
//--------------------------------------
// Listens for new steps and votes,
// broadcasting the result to peers
func (conR *ConsensusReactor) registerEventCallbacks() {
conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) {
rs := data.(*types.EventDataRoundState).RoundState().(*RoundState)
conR.broadcastNewRoundStep(rs)
})
conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) {
edv := data.(*types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
})
}
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, nrsMsg)
}
if csMsg != nil {
conR.Switch.Broadcast(StateChannel, csMsg)
}
}
// Broadcasts HasVoteMessage to peers that care.
func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index int) {
msg := &HasVoteMessage{
@ -239,28 +282,16 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in
*/
}
// Sets our private validator account for signing votes.
func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
conR.conS.SetPrivValidator(priv)
}
// implements events.Eventable
func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
conR.evsw = evsw
conR.conS.SetFireable(evsw)
}
//--------------------------------------
func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
step := RoundStepType(rs.Step)
nrsMsg = &NewRoundStepMessage{
Height: rs.Height,
Round: rs.Round,
Step: rs.Step,
Step: step,
SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommit.Round(),
}
if rs.Step == RoundStepCommit {
if step == RoundStepCommit {
csMsg = &CommitStepMessage{
Height: rs.Height,
BlockPartsHeader: rs.ProposalBlockParts.Header(),
@ -270,28 +301,6 @@ func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *
return
}
// Listens for changes to the ConsensusState.Step by pulling
// on conR.conS.NewStepCh().
func (conR *ConsensusReactor) broadcastNewRoundStepRoutine() {
for {
// Get RoundState with new Step or quit.
var rs *RoundState
select {
case rs = <-conR.conS.NewStepCh():
case <-conR.Quit:
return
}
nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil {
conR.Switch.Broadcast(StateChannel, nrsMsg)
}
if csMsg != nil {
conR.Switch.Broadcast(StateChannel, csMsg)
}
}
}
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs)