Fireable -> EventSwitch; rs in EventDataRoundState; fixes from review

This commit is contained in:
Ethan Buchman 2015-12-14 00:38:19 -05:00
parent 261647a012
commit b9e143d956
10 changed files with 127 additions and 166 deletions

View File

@ -52,7 +52,7 @@ type BlockchainReactor struct {
timeoutsCh chan string timeoutsCh chan string
lastBlock *types.Block lastBlock *types.Block
evsw events.Fireable evsw *events.EventSwitch
} }
func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *BlockStore, sync bool) *BlockchainReactor { func NewBlockchainReactor(state *sm.State, proxyAppCtx proxy.AppContext, store *BlockStore, sync bool) *BlockchainReactor {
@ -261,7 +261,7 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
} }
// implements events.Eventable // implements events.Eventable
func (bcR *BlockchainReactor) SetFireable(evsw events.Fireable) { func (bcR *BlockchainReactor) SetEventSwitch(evsw *events.EventSwitch) {
bcR.evsw = evsw bcR.evsw = evsw
} }

View File

@ -324,7 +324,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
cs.SetPrivValidator(privVals[0]) cs.SetPrivValidator(privVals[0])
evsw := events.NewEventSwitch() evsw := events.NewEventSwitch()
cs.SetFireable(evsw) cs.SetEventSwitch(evsw)
evsw.Start() evsw.Start()
// start the transition routines // start the transition routines
@ -340,7 +340,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
} }
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} { func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
voteCh0 := cs.evsw.(*events.EventSwitch).SubscribeToEvent(types.EventStringVote(), 0) voteCh0 := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
voteCh := make(chan interface{}) voteCh := make(chan interface{})
go func() { go func() {
for { for {

View File

@ -34,7 +34,7 @@ type ConsensusReactor struct {
blockStore *bc.BlockStore blockStore *bc.BlockStore
conS *ConsensusState conS *ConsensusState
fastSync bool fastSync bool
evsw events.Fireable evsw *events.EventSwitch
} }
func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor { func NewConsensusReactor(consensusState *ConsensusState, blockStore *bc.BlockStore, fastSync bool) *ConsensusReactor {
@ -221,9 +221,9 @@ func (conR *ConsensusReactor) SetPrivValidator(priv *types.PrivValidator) {
} }
// implements events.Eventable // implements events.Eventable
func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) { func (conR *ConsensusReactor) SetEventSwitch(evsw *events.EventSwitch) {
conR.evsw = evsw conR.evsw = evsw
conR.conS.SetFireable(evsw) conR.conS.SetEventSwitch(evsw)
} }
//-------------------------------------- //--------------------------------------
@ -231,21 +231,19 @@ func (conR *ConsensusReactor) SetFireable(evsw events.Fireable) {
// Listens for new steps and votes, // Listens for new steps and votes,
// broadcasting the result to peers // broadcasting the result to peers
func (conR *ConsensusReactor) registerEventCallbacks() { func (conR *ConsensusReactor) registerEventCallbacks() {
// XXX: should we change SetFireable to just use EventSwitch so we don't need these assertions?
evsw := conR.evsw.(*events.EventSwitch)
evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) { conR.evsw.AddListenerForEvent("conR", types.EventStringNewRoundStep(), func(data types.EventData) {
rs := data.(*types.EventDataRoundState) rs := data.(*types.EventDataRoundState).RoundState().(*RoundState)
conR.broadcastNewRoundStep(rs) conR.broadcastNewRoundStep(rs)
}) })
evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) { conR.evsw.AddListenerForEvent("conR", types.EventStringVote(), func(data types.EventData) {
edv := data.(*types.EventDataVote) edv := data.(*types.EventDataVote)
conR.broadcastHasVoteMessage(edv.Vote, edv.Index) conR.broadcastHasVoteMessage(edv.Vote, edv.Index)
}) })
} }
func (conR *ConsensusReactor) broadcastNewRoundStep(rs *types.EventDataRoundState) { func (conR *ConsensusReactor) broadcastNewRoundStep(rs *RoundState) {
nrsMsg, csMsg := makeRoundStepMessages(rs) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
@ -282,20 +280,20 @@ func (conR *ConsensusReactor) broadcastHasVoteMessage(vote *types.Vote, index in
*/ */
} }
func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) { func makeRoundStepMessages(rs *RoundState) (nrsMsg *NewRoundStepMessage, csMsg *CommitStepMessage) {
step := RoundStepType(rs.Step) step := RoundStepType(rs.Step)
nrsMsg = &NewRoundStepMessage{ nrsMsg = &NewRoundStepMessage{
Height: rs.Height, Height: rs.Height,
Round: rs.Round, Round: rs.Round,
Step: step, Step: step,
SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()), SecondsSinceStartTime: int(time.Now().Sub(rs.StartTime).Seconds()),
LastCommitRound: rs.LastCommitRound, LastCommitRound: rs.LastCommit.Round(),
} }
if step == RoundStepCommit { if step == RoundStepCommit {
csMsg = &CommitStepMessage{ csMsg = &CommitStepMessage{
Height: rs.Height, Height: rs.Height,
BlockPartsHeader: rs.BlockPartsHeader, BlockPartsHeader: rs.ProposalBlockParts.Header(),
BlockParts: rs.BlockParts, BlockParts: rs.ProposalBlockParts.BitArray(),
} }
} }
return return
@ -303,7 +301,7 @@ func makeRoundStepMessages(rs *types.EventDataRoundState) (nrsMsg *NewRoundStepM
func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) { func (conR *ConsensusReactor) sendNewRoundStepMessage(peer *p2p.Peer) {
rs := conR.conS.GetRoundState() rs := conR.conS.GetRoundState()
nrsMsg, csMsg := makeRoundStepMessages(rs.RoundStateEvent()) nrsMsg, csMsg := makeRoundStepMessages(rs)
if nrsMsg != nil { if nrsMsg != nil {
peer.Send(StateChannel, nrsMsg) peer.Send(StateChannel, nrsMsg)
} }

View File

@ -98,27 +98,13 @@ type RoundState struct {
} }
func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState { func (rs *RoundState) RoundStateEvent() *types.EventDataRoundState {
var header types.PartSetHeader edrs := &types.EventDataRoundState{
var parts *BitArray Height: rs.Height,
if rs.ProposalBlockParts != nil { Round: rs.Round,
header = rs.ProposalBlockParts.Header() Step: rs.Step.String(),
parts = rs.ProposalBlockParts.BitArray()
}
return &types.EventDataRoundState{
CurrentTime: time.Now(),
Height: rs.Height,
Round: rs.Round,
Step: int(rs.Step),
StartTime: rs.StartTime,
CommitTime: rs.CommitTime,
Proposal: rs.Proposal,
ProposalBlock: rs.ProposalBlock,
LockedRound: rs.LockedRound,
LockedBlock: rs.LockedBlock,
POLRound: rs.Votes.POLRound(),
BlockPartsHeader: header,
BlockParts: parts,
} }
edrs.SetRoundState(rs)
return edrs
} }
func (rs *RoundState) String() string { func (rs *RoundState) String() string {
@ -204,7 +190,7 @@ type ConsensusState struct {
tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine tickChan chan timeoutInfo // start the timeoutTicker in the timeoutRoutine
tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine tockChan chan timeoutInfo // timeouts are relayed on tockChan to the receiveRoutine
evsw events.Fireable evsw *events.EventSwitch
evc *events.EventCache // set in stageBlock and passed into state evc *events.EventCache // set in stageBlock and passed into state
nSteps int // used for testing to limit the number of transitions the state makes nSteps int // used for testing to limit the number of transitions the state makes
@ -233,7 +219,7 @@ func NewConsensusState(state *sm.State, proxyAppCtx proxy.AppContext, blockStore
// Public interface // Public interface
// implements events.Eventable // implements events.Eventable
func (cs *ConsensusState) SetFireable(evsw events.Fireable) { func (cs *ConsensusState) SetEventSwitch(evsw *events.EventSwitch) {
cs.evsw = evsw cs.evsw = evsw
} }
@ -641,9 +627,7 @@ func (cs *ConsensusState) handleTimeout(ti timeoutInfo, rs RoundState) {
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// State functions // State functions
// Many of these functions are capitalized but are not really meant to be used // Used internally by handleTimeout and handleMsg to make state transitions
// by external code as it will cause race conditions with running timeout/receiveRoutine.
// Use AddVote, SetProposal, AddProposalBlockPart instead
// Enter: +2/3 precommits for nil at (height,round-1) // Enter: +2/3 precommits for nil at (height,round-1)
// Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1) // Enter: `timeoutPrecommits` after any +2/3 precommits from (height,round-1)
@ -706,6 +690,13 @@ func (cs *ConsensusState) enterPropose(height int, round int) {
// Done enterPropose: // Done enterPropose:
cs.updateRoundStep(round, RoundStepPropose) cs.updateRoundStep(round, RoundStepPropose)
cs.newStep() cs.newStep()
// If we have the whole proposal + POL, then goto Prevote now.
// else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
// or else after timeoutPropose
if cs.isProposalComplete() {
cs.enterPrevote(height, cs.Round)
}
}() }()
// This step times out after `timeoutPropose` // This step times out after `timeoutPropose`
@ -723,12 +714,6 @@ func (cs *ConsensusState) enterPropose(height int, round int) {
cs.decideProposal(height, round) cs.decideProposal(height, round)
} }
// If we have the whole proposal + POL, then goto Prevote now.
// else, we'll enterPrevote when the rest of the proposal is received (in AddProposalBlockPart),
// or else after timeoutPropose
if cs.isProposalComplete() {
cs.enterPrevote(height, cs.Round)
}
} }
func (cs *ConsensusState) decideProposal(height, round int) { func (cs *ConsensusState) decideProposal(height, round int) {
@ -1117,29 +1102,29 @@ func (cs *ConsensusState) tryFinalizeCommit(height int) {
return return
} }
// go // go
cs.FinalizeCommit(height) cs.finalizeCommit(height)
} }
// Increment height and goto RoundStepNewHeight // Increment height and goto RoundStepNewHeight
func (cs *ConsensusState) FinalizeCommit(height int) { func (cs *ConsensusState) finalizeCommit(height int) {
//cs.mtx.Lock() //cs.mtx.Lock()
//defer cs.mtx.Unlock() //defer cs.mtx.Unlock()
if cs.Height != height || cs.Step != RoundStepCommit { if cs.Height != height || cs.Step != RoundStepCommit {
log.Debug(Fmt("FinalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step)) log.Debug(Fmt("finalizeCommit(%v): Invalid args. Current step: %v/%v/%v", height, cs.Height, cs.Round, cs.Step))
return return
} }
hash, header, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority() hash, header, ok := cs.Votes.Precommits(cs.CommitRound).TwoThirdsMajority()
if !ok { if !ok {
PanicSanity(Fmt("Cannot FinalizeCommit, commit does not have two thirds majority")) PanicSanity(Fmt("Cannot finalizeCommit, commit does not have two thirds majority"))
} }
if !cs.ProposalBlockParts.HasHeader(header) { if !cs.ProposalBlockParts.HasHeader(header) {
PanicSanity(Fmt("Expected ProposalBlockParts header to be commit header")) PanicSanity(Fmt("Expected ProposalBlockParts header to be commit header"))
} }
if !cs.ProposalBlock.HashesTo(hash) { if !cs.ProposalBlock.HashesTo(hash) {
PanicSanity(Fmt("Cannot FinalizeCommit, ProposalBlock does not hash to commit hash")) PanicSanity(Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash"))
} }
if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil { if err := cs.stageBlock(cs.ProposalBlock, cs.ProposalBlockParts); err != nil {
PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err)) PanicConsensus(Fmt("+2/3 committed an invalid block: %v", err))
@ -1378,7 +1363,7 @@ func (cs *ConsensusState) stageBlock(block *types.Block, blockParts *types.PartS
// Create a copy of the state for staging // Create a copy of the state for staging
stateCopy := cs.state.Copy() stateCopy := cs.state.Copy()
stateCopy.SetFireable(cs.evc) stateCopy.SetEventCache(cs.evc)
// Run the block on the State: // Run the block on the State:
// + update validator sets // + update validator sets

View File

@ -8,7 +8,6 @@ import (
_ "github.com/tendermint/tendermint/config/tendermint_test" _ "github.com/tendermint/tendermint/config/tendermint_test"
//"github.com/tendermint/tendermint/events" //"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/events"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -54,9 +53,8 @@ func TestProposerSelection0(t *testing.T) {
cs1, vss := simpleConsensusState(4) cs1, vss := simpleConsensusState(4)
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
evsw := cs1.evsw.(*events.EventSwitch) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0)
startTestRound(cs1, height, round) startTestRound(cs1, height, round)
@ -88,8 +86,7 @@ func TestProposerSelection0(t *testing.T) {
func TestProposerSelection2(t *testing.T) { func TestProposerSelection2(t *testing.T) {
cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators
evsw := cs1.evsw.(*events.EventSwitch) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1)
// this time we jump in at round 2 // this time we jump in at round 2
incrementRound(vss[1:]...) incrementRound(vss[1:]...)
@ -120,9 +117,8 @@ func TestEnterProposeNoPrivValidator(t *testing.T) {
cs.SetPrivValidator(nil) cs.SetPrivValidator(nil)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
evsw := cs.evsw.(*events.EventSwitch)
// Listen for propose timeout event // Listen for propose timeout event
timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
startTestRound(cs, height, round) startTestRound(cs, height, round)
@ -146,9 +142,9 @@ func TestEnterProposeYesPrivValidator(t *testing.T) {
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
// Listen for propose timeout event // Listen for propose timeout event
evsw := cs.evsw.(*events.EventSwitch)
timeoutCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) proposalCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
cs.enterNewRound(height, round) cs.enterNewRound(height, round)
cs.startRoutines(3) cs.startRoutines(3)
@ -182,9 +178,8 @@ func TestBadProposal(t *testing.T) {
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
cs2 := vss[1] cs2 := vss[1]
evsw := cs1.evsw.(*events.EventSwitch) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0)
propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2) propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2)
@ -238,10 +233,9 @@ func TestFullRound1(t *testing.T) {
cs, vss := simpleConsensusState(1) cs, vss := simpleConsensusState(1)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
evsw := cs.evsw.(*events.EventSwitch) voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) propCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
propCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) newRoundCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1)
startTestRound(cs, height, round) startTestRound(cs, height, round)
@ -249,7 +243,7 @@ func TestFullRound1(t *testing.T) {
// grab proposal // grab proposal
re := <-propCh re := <-propCh
propBlockHash := re.(*types.EventDataRoundState).ProposalBlock.Hash() propBlockHash := re.(*types.EventDataRoundState).RoundState().(*RoundState).ProposalBlock.Hash()
<-voteCh // wait for prevote <-voteCh // wait for prevote
validatePrevote(t, cs, round, vss[0], propBlockHash) validatePrevote(t, cs, round, vss[0], propBlockHash)
@ -267,8 +261,7 @@ func TestFullRoundNil(t *testing.T) {
cs, vss := simpleConsensusState(1) cs, vss := simpleConsensusState(1)
height, round := cs.Height, cs.Round height, round := cs.Height, cs.Round
evsw := cs.evsw.(*events.EventSwitch) voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0)
cs.enterPrevote(height, round) cs.enterPrevote(height, round)
cs.startRoutines(4) cs.startRoutines(4)
@ -287,9 +280,8 @@ func TestFullRound2(t *testing.T) {
cs2 := vss[1] cs2 := vss[1]
height, round := cs1.Height, cs1.Round height, round := cs1.Height, cs1.Round
evsw := cs1.evsw.(*events.EventSwitch) voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0)
newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0)
// start round and wait for propose and prevote // start round and wait for propose and prevote
startTestRound(cs1, height, round) startTestRound(cs1, height, round)
@ -329,12 +321,11 @@ func TestLockNoPOL(t *testing.T) {
cs2 := vss[1] cs2 := vss[1]
height := cs1.Height height := cs1.Height
evsw := cs1.evsw.(*events.EventSwitch) timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1)
/* /*
Round1 (cs1, B) // B B // B B2 Round1 (cs1, B) // B B // B B2
@ -345,7 +336,7 @@ func TestLockNoPOL(t *testing.T) {
cs1.startRoutines(0) cs1.startRoutines(0)
re := <-proposalCh re := <-proposalCh
rs := re.(*types.EventDataRoundState) rs := re.(*types.EventDataRoundState).RoundState().(*RoundState)
theBlockHash := rs.ProposalBlock.Hash() theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote <-voteCh // prevote
@ -385,7 +376,7 @@ func TestLockNoPOL(t *testing.T) {
// now we're on a new round and not the proposer, so wait for timeout // now we're on a new round and not the proposer, so wait for timeout
re = <-timeoutProposeCh re = <-timeoutProposeCh
rs = re.(*types.EventDataRoundState) rs = re.(*types.EventDataRoundState).RoundState().(*RoundState)
if rs.ProposalBlock != nil { if rs.ProposalBlock != nil {
t.Fatal("Expected proposal block to be nil") t.Fatal("Expected proposal block to be nil")
@ -429,7 +420,7 @@ func TestLockNoPOL(t *testing.T) {
incrementRound(cs2) incrementRound(cs2)
re = <-proposalCh re = <-proposalCh
rs = re.(*types.EventDataRoundState) rs = re.(*types.EventDataRoundState).RoundState().(*RoundState)
// now we're on a new round and are the proposer // now we're on a new round and are the proposer
if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) { if !bytes.Equal(rs.ProposalBlock.Hash(), rs.LockedBlock.Hash()) {
@ -492,13 +483,12 @@ func TestLockPOLRelock(t *testing.T) {
cs1, vss := simpleConsensusState(4) cs1, vss := simpleConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
evsw := cs1.evsw.(*events.EventSwitch) timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
voteCh := evsw.SubscribeToEvent(types.EventStringVote(), 0) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0)
newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0)
log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound) log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound)
@ -515,7 +505,7 @@ func TestLockPOLRelock(t *testing.T) {
<-newRoundCh <-newRoundCh
re := <-proposalCh re := <-proposalCh
rs := re.(*types.EventDataRoundState) rs := re.(*types.EventDataRoundState).RoundState().(*RoundState)
theBlockHash := rs.ProposalBlock.Hash() theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote <-voteCh // prevote
@ -586,7 +576,7 @@ func TestLockPOLRelock(t *testing.T) {
be := <-newBlockCh be := <-newBlockCh
b := be.(types.EventDataNewBlock) b := be.(types.EventDataNewBlock)
re = <-newRoundCh re = <-newRoundCh
rs = re.(*types.EventDataRoundState) rs = re.(*types.EventDataRoundState).RoundState().(*RoundState)
if rs.Height != 2 { if rs.Height != 2 {
t.Fatal("Expected height to increment") t.Fatal("Expected height to increment")
} }
@ -601,12 +591,11 @@ func TestLockPOLUnlock(t *testing.T) {
cs1, vss := simpleConsensusState(4) cs1, vss := simpleConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
evsw := cs1.evsw.(*events.EventSwitch) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0)
unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0)
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
// everything done from perspective of cs1 // everything done from perspective of cs1
@ -621,7 +610,7 @@ func TestLockPOLUnlock(t *testing.T) {
startTestRound(cs1, cs1.Height, 0) startTestRound(cs1, cs1.Height, 0)
<-newRoundCh <-newRoundCh
re := <-proposalCh re := <-proposalCh
rs := re.(*types.EventDataRoundState) rs := re.(*types.EventDataRoundState).RoundState().(*RoundState)
theBlockHash := rs.ProposalBlock.Hash() theBlockHash := rs.ProposalBlock.Hash()
<-voteCh // prevote <-voteCh // prevote
@ -645,7 +634,7 @@ func TestLockPOLUnlock(t *testing.T) {
// timeout to new round // timeout to new round
re = <-timeoutWaitCh re = <-timeoutWaitCh
rs = re.(*types.EventDataRoundState) rs = re.(*types.EventDataRoundState).RoundState().(*RoundState)
lockedBlockHash := rs.LockedBlock.Hash() lockedBlockHash := rs.LockedBlock.Hash()
//XXX: this isnt gauranteed to get there before the timeoutPropose ... //XXX: this isnt gauranteed to get there before the timeoutPropose ...
@ -693,18 +682,17 @@ func TestLockPOLSafety1(t *testing.T) {
cs1, vss := simpleConsensusState(4) cs1, vss := simpleConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
evsw := cs1.evsw.(*events.EventSwitch) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1)
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
// start round and wait for propose and prevote // start round and wait for propose and prevote
startTestRound(cs1, cs1.Height, 0) startTestRound(cs1, cs1.Height, 0)
<-newRoundCh <-newRoundCh
re := <-proposalCh re := <-proposalCh
rs := re.(*types.EventDataRoundState) rs := re.(*types.EventDataRoundState).RoundState().(*RoundState)
propBlock := rs.ProposalBlock propBlock := rs.ProposalBlock
<-voteCh // prevote <-voteCh // prevote
@ -752,7 +740,7 @@ func TestLockPOLSafety1(t *testing.T) {
re = <-proposalCh re = <-proposalCh
} }
rs = re.(*types.EventDataRoundState) rs = re.(*types.EventDataRoundState).RoundState().(*RoundState)
if rs.LockedBlock != nil { if rs.LockedBlock != nil {
t.Fatal("we should not be locked!") t.Fatal("we should not be locked!")
@ -792,7 +780,7 @@ func TestLockPOLSafety1(t *testing.T) {
// we should prevote what we're locked on // we should prevote what we're locked on
validatePrevote(t, cs1, 2, vss[0], propBlockHash) validatePrevote(t, cs1, 2, vss[0], propBlockHash)
newStepCh := evsw.SubscribeToEvent(types.EventStringNewRoundStep(), 0) newStepCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRoundStep(), 0)
// add prevotes from the earlier round // add prevotes from the earlier round
addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4) addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4)
@ -813,12 +801,11 @@ func TestLockPOLSafety2(t *testing.T) {
cs1, vss := simpleConsensusState(4) cs1, vss := simpleConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
evsw := cs1.evsw.(*events.EventSwitch) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
timeoutProposeCh := evsw.SubscribeToEvent(types.EventStringTimeoutPropose(), 0) timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0)
unlockCh := evsw.SubscribeToEvent(types.EventStringUnlock(), 0)
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
// the block for R0: gets polkad but we miss it // the block for R0: gets polkad but we miss it
@ -904,10 +891,10 @@ func TestSlashingPrevotes(t *testing.T) {
cs1, vss := simpleConsensusState(2) cs1, vss := simpleConsensusState(2)
cs2 := vss[1] cs2 := vss[1]
evsw := cs1.evsw.(*events.EventSwitch)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0)
timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1)
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
// start round and wait for propose and prevote // start round and wait for propose and prevote
@ -916,7 +903,7 @@ func TestSlashingPrevotes(t *testing.T) {
re := <-proposalCh re := <-proposalCh
<-voteCh // prevote <-voteCh // prevote
rs := re.(*types.EventDataRoundState) rs := re.(*types.EventDataRoundState).RoundState().(*RoundState)
// we should now be stuck in limbo forever, waiting for more prevotes // we should now be stuck in limbo forever, waiting for more prevotes
// add one for a different block should cause us to go into prevote wait // add one for a different block should cause us to go into prevote wait
@ -939,10 +926,10 @@ func TestSlashingPrecommits(t *testing.T) {
cs1, vss := simpleConsensusState(2) cs1, vss := simpleConsensusState(2)
cs2 := vss[1] cs2 := vss[1]
evsw := cs1.evsw.(*events.EventSwitch)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal() , 0) proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0)
timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait() , 0) timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound() , 1) newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1)
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
// start round and wait for propose and prevote // start round and wait for propose and prevote
@ -984,18 +971,17 @@ func TestHalt1(t *testing.T) {
cs1, vss := simpleConsensusState(4) cs1, vss := simpleConsensusState(4)
cs2, cs3, cs4 := vss[1], vss[2], vss[3] cs2, cs3, cs4 := vss[1], vss[2], vss[3]
evsw := cs1.evsw.(*events.EventSwitch) proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
proposalCh := evsw.SubscribeToEvent(types.EventStringCompleteProposal(), 0) timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
timeoutWaitCh := evsw.SubscribeToEvent(types.EventStringTimeoutWait(), 0) newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
newRoundCh := evsw.SubscribeToEvent(types.EventStringNewRound(), 1) newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0)
newBlockCh := evsw.SubscribeToEvent(types.EventStringNewBlock(), 0)
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address) voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
// start round and wait for propose and prevote // start round and wait for propose and prevote
startTestRound(cs1, cs1.Height, 0) startTestRound(cs1, cs1.Height, 0)
<-newRoundCh <-newRoundCh
re := <-proposalCh re := <-proposalCh
rs := re.(*types.EventDataRoundState) rs := re.(*types.EventDataRoundState).RoundState().(*RoundState)
propBlock := rs.ProposalBlock propBlock := rs.ProposalBlock
propBlockParts := propBlock.MakePartSet() propBlockParts := propBlock.MakePartSet()
@ -1018,7 +1004,7 @@ func TestHalt1(t *testing.T) {
// timeout to new round // timeout to new round
<-timeoutWaitCh <-timeoutWaitCh
re = <-newRoundCh re = <-newRoundCh
rs = re.(*types.EventDataRoundState) rs = re.(*types.EventDataRoundState).RoundState().(*RoundState)
log.Notice("### ONTO ROUND 1") log.Notice("### ONTO ROUND 1")
/*Round2 /*Round2
@ -1036,7 +1022,7 @@ func TestHalt1(t *testing.T) {
// receiving that precommit should take us straight to commit // receiving that precommit should take us straight to commit
<-newBlockCh <-newBlockCh
re = <-newRoundCh re = <-newRoundCh
rs = re.(*types.EventDataRoundState) rs = re.(*types.EventDataRoundState).RoundState().(*RoundState)
if rs.Height != 2 { if rs.Height != 2 {
t.Fatal("expected height to increment") t.Fatal("expected height to increment")

View File

@ -10,7 +10,7 @@ import (
// reactors and other modules should export // reactors and other modules should export
// this interface to become eventable // this interface to become eventable
type Eventable interface { type Eventable interface {
SetFireable(Fireable) SetEventSwitch(evsw *EventSwitch)
} }
// an event switch or cache implements fireable // an event switch or cache implements fireable
@ -123,10 +123,10 @@ func (evsw *EventSwitch) FireEvent(event string, data types.EventData) {
eventCell.FireEvent(data) eventCell.FireEvent(data)
} }
func (evsw *EventSwitch) SubscribeToEvent(eventID string, chanCap int) chan interface{} { func (evsw *EventSwitch) SubscribeToEvent(receiver, eventID string, chanCap int) chan interface{} {
// listen for new round // listen for new round
ch := make(chan interface{}, chanCap) ch := make(chan interface{}, chanCap)
evsw.AddListenerForEvent("tester", eventID, func(data types.EventData) { evsw.AddListenerForEvent(receiver, eventID, func(data types.EventData) {
// NOTE: in production, evsw callbacks should be nonblocking. // NOTE: in production, evsw callbacks should be nonblocking.
ch <- data ch <- data
}) })

View File

@ -25,7 +25,7 @@ const (
type MempoolReactor struct { type MempoolReactor struct {
p2p.BaseReactor p2p.BaseReactor
Mempool *Mempool // TODO: un-expose Mempool *Mempool // TODO: un-expose
evsw events.Fireable evsw *events.EventSwitch
} }
func NewMempoolReactor(mempool *Mempool) *MempoolReactor { func NewMempoolReactor(mempool *Mempool) *MempoolReactor {
@ -135,7 +135,7 @@ func (memR *MempoolReactor) broadcastTxRoutine(peer Peer) {
} }
// implements events.Eventable // implements events.Eventable
func (memR *MempoolReactor) SetFireable(evsw events.Fireable) { func (memR *MempoolReactor) SetEventSwitch(evsw *events.EventSwitch) {
memR.evsw = evsw memR.evsw = evsw
} }

View File

@ -101,7 +101,7 @@ func NewNode() *Node {
// add the event switch to all services // add the event switch to all services
// they should all satisfy events.Eventable // they should all satisfy events.Eventable
SetFireable(eventSwitch, bcReactor, mempoolReactor, consensusReactor) SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
// run the profile server // run the profile server
profileHost := config.GetString("prof_laddr") profileHost := config.GetString("prof_laddr")
@ -144,9 +144,9 @@ func (n *Node) Stop() {
} }
// Add the event switch to reactors, mempool, etc. // Add the event switch to reactors, mempool, etc.
func SetFireable(evsw *events.EventSwitch, eventables ...events.Eventable) { func SetEventSwitch(evsw *events.EventSwitch, eventables ...events.Eventable) {
for _, e := range eventables { for _, e := range eventables {
e.SetFireable(evsw) e.SetEventSwitch(evsw)
} }
} }

View File

@ -33,7 +33,7 @@ type State struct {
LastValidators *types.ValidatorSet LastValidators *types.ValidatorSet
LastAppHash []byte LastAppHash []byte
evc events.Fireable // typically an events.EventCache evc *events.EventCache
} }
func LoadState(db dbm.DB) *State { func LoadState(db dbm.DB) *State {
@ -81,8 +81,7 @@ func (s *State) Save() {
s.db.Set(stateKey, buf.Bytes()) s.db.Set(stateKey, buf.Bytes())
} }
// Implements events.Eventable. Typically uses events.EventCache func (s *State) SetEventCache(evc *events.EventCache) {
func (s *State) SetFireable(evc events.Fireable) {
s.mtx.Lock() s.mtx.Lock()
defer s.mtx.Unlock() defer s.mtx.Unlock()

View File

@ -1,9 +1,6 @@
package types package types
import ( import (
"time"
"github.com/tendermint/go-common"
"github.com/tendermint/go-wire" "github.com/tendermint/go-wire"
) )
@ -74,25 +71,21 @@ type EventDataApp struct {
Data []byte `json:"bytes"` Data []byte `json:"bytes"`
} }
// We fire the most recent round state that led to the event
// (ie. NewRound will have the previous rounds state)
type EventDataRoundState struct { type EventDataRoundState struct {
CurrentTime time.Time `json:"current_time"` Height int `json:"height"`
Round int `json:"round"`
Step string `json:"step"`
Height int `json:"height"` // private, not exposed to websockets
Round int `json:"round"` rs interface{}
Step int `json:"step"` }
LastCommitRound int `json:"last_commit_round"`
StartTime time.Time `json:"start_time"`
CommitTime time.Time `json:"commit_time"`
Proposal *Proposal `json:"proposal"`
ProposalBlock *Block `json:"proposal_block"`
LockedRound int `json:"locked_round"`
LockedBlock *Block `json:"locked_block"`
POLRound int `json:"pol_round"`
BlockPartsHeader PartSetHeader `json:"block_parts_header"` func (edrs *EventDataRoundState) RoundState() interface{} {
BlockParts *common.BitArray `json:"block_parts"` return edrs.rs
}
func (edrs *EventDataRoundState) SetRoundState(rs interface{}) {
edrs.rs = rs
} }
type EventDataVote struct { type EventDataVote struct {