mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-16 22:51:22 +00:00
Make subscribeToEvent have capacity 1
This commit is contained in:
@ -9,9 +9,9 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
dbm "github.com/tendermint/go-db"
|
dbm "github.com/tendermint/go-db"
|
||||||
|
"github.com/tendermint/go-events"
|
||||||
bc "github.com/tendermint/tendermint/blockchain"
|
bc "github.com/tendermint/tendermint/blockchain"
|
||||||
_ "github.com/tendermint/tendermint/config/tendermint_test"
|
_ "github.com/tendermint/tendermint/config/tendermint_test"
|
||||||
"github.com/tendermint/go-events"
|
|
||||||
mempl "github.com/tendermint/tendermint/mempool"
|
mempl "github.com/tendermint/tendermint/mempool"
|
||||||
"github.com/tendermint/tendermint/proxy"
|
"github.com/tendermint/tendermint/proxy"
|
||||||
sm "github.com/tendermint/tendermint/state"
|
sm "github.com/tendermint/tendermint/state"
|
||||||
@ -339,7 +339,7 @@ func simpleConsensusState(nValidators int) (*ConsensusState, []*validatorStub) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
|
func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
|
||||||
voteCh0 := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
|
voteCh0 := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
|
||||||
voteCh := make(chan interface{})
|
voteCh := make(chan interface{})
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
@ -386,3 +386,13 @@ func startTestRound(cs *ConsensusState, height, round int) {
|
|||||||
cs.enterNewRound(height, round)
|
cs.enterNewRound(height, round)
|
||||||
cs.startRoutines(0)
|
cs.startRoutines(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: this is blocking
|
||||||
|
func subscribeToEvent(evsw *events.EventSwitch, receiver, eventID string, chanCap int) chan interface{} {
|
||||||
|
// listen for new round
|
||||||
|
ch := make(chan interface{}, chanCap)
|
||||||
|
evsw.AddListenerForEvent(receiver, eventID, func(data events.EventData) {
|
||||||
|
ch <- data
|
||||||
|
})
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
@ -53,8 +53,8 @@ func TestProposerSelection0(t *testing.T) {
|
|||||||
cs1, vss := simpleConsensusState(4)
|
cs1, vss := simpleConsensusState(4)
|
||||||
height, round := cs1.Height, cs1.Round
|
height, round := cs1.Height, cs1.Round
|
||||||
|
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
|
|
||||||
startTestRound(cs1, height, round)
|
startTestRound(cs1, height, round)
|
||||||
|
|
||||||
@ -86,7 +86,7 @@ func TestProposerSelection0(t *testing.T) {
|
|||||||
func TestProposerSelection2(t *testing.T) {
|
func TestProposerSelection2(t *testing.T) {
|
||||||
cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators
|
cs1, vss := simpleConsensusState(4) // test needs more work for more than 3 validators
|
||||||
|
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
|
|
||||||
// this time we jump in at round 2
|
// this time we jump in at round 2
|
||||||
incrementRound(vss[1:]...)
|
incrementRound(vss[1:]...)
|
||||||
@ -118,7 +118,7 @@ func TestEnterProposeNoPrivValidator(t *testing.T) {
|
|||||||
height, round := cs.Height, cs.Round
|
height, round := cs.Height, cs.Round
|
||||||
|
|
||||||
// Listen for propose timeout event
|
// Listen for propose timeout event
|
||||||
timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
|
timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1)
|
||||||
|
|
||||||
startTestRound(cs, height, round)
|
startTestRound(cs, height, round)
|
||||||
|
|
||||||
@ -143,8 +143,8 @@ func TestEnterProposeYesPrivValidator(t *testing.T) {
|
|||||||
|
|
||||||
// Listen for propose timeout event
|
// Listen for propose timeout event
|
||||||
|
|
||||||
timeoutCh := cs.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
|
timeoutCh := subscribeToEvent(cs.evsw, "tester", types.EventStringTimeoutPropose(), 1)
|
||||||
proposalCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
|
|
||||||
cs.enterNewRound(height, round)
|
cs.enterNewRound(height, round)
|
||||||
cs.startRoutines(3)
|
cs.startRoutines(3)
|
||||||
@ -178,8 +178,8 @@ func TestBadProposal(t *testing.T) {
|
|||||||
height, round := cs1.Height, cs1.Round
|
height, round := cs1.Height, cs1.Round
|
||||||
cs2 := vss[1]
|
cs2 := vss[1]
|
||||||
|
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
|
voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1)
|
||||||
|
|
||||||
propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2)
|
propBlock, _ := cs1.createProposalBlock() //changeProposer(t, cs1, cs2)
|
||||||
|
|
||||||
@ -233,9 +233,9 @@ func TestFullRound1(t *testing.T) {
|
|||||||
cs, vss := simpleConsensusState(1)
|
cs, vss := simpleConsensusState(1)
|
||||||
height, round := cs.Height, cs.Round
|
height, round := cs.Height, cs.Round
|
||||||
|
|
||||||
voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
|
voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
|
||||||
propCh := cs.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
propCh := subscribeToEvent(cs.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
newRoundCh := cs.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
|
|
||||||
startTestRound(cs, height, round)
|
startTestRound(cs, height, round)
|
||||||
|
|
||||||
@ -261,7 +261,7 @@ func TestFullRoundNil(t *testing.T) {
|
|||||||
cs, vss := simpleConsensusState(1)
|
cs, vss := simpleConsensusState(1)
|
||||||
height, round := cs.Height, cs.Round
|
height, round := cs.Height, cs.Round
|
||||||
|
|
||||||
voteCh := cs.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
|
voteCh := subscribeToEvent(cs.evsw, "tester", types.EventStringVote(), 1)
|
||||||
|
|
||||||
cs.enterPrevote(height, round)
|
cs.enterPrevote(height, round)
|
||||||
cs.startRoutines(4)
|
cs.startRoutines(4)
|
||||||
@ -280,8 +280,8 @@ func TestFullRound2(t *testing.T) {
|
|||||||
cs2 := vss[1]
|
cs2 := vss[1]
|
||||||
height, round := cs1.Height, cs1.Round
|
height, round := cs1.Height, cs1.Round
|
||||||
|
|
||||||
voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
|
voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1)
|
||||||
newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0)
|
newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1)
|
||||||
|
|
||||||
// start round and wait for propose and prevote
|
// start round and wait for propose and prevote
|
||||||
startTestRound(cs1, height, round)
|
startTestRound(cs1, height, round)
|
||||||
@ -321,11 +321,11 @@ func TestLockNoPOL(t *testing.T) {
|
|||||||
cs2 := vss[1]
|
cs2 := vss[1]
|
||||||
height := cs1.Height
|
height := cs1.Height
|
||||||
|
|
||||||
timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
|
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
|
||||||
timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
|
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)
|
||||||
voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
|
voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1)
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Round1 (cs1, B) // B B // B B2
|
Round1 (cs1, B) // B B // B B2
|
||||||
@ -483,12 +483,12 @@ func TestLockPOLRelock(t *testing.T) {
|
|||||||
cs1, vss := simpleConsensusState(4)
|
cs1, vss := simpleConsensusState(4)
|
||||||
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
||||||
|
|
||||||
timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
|
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
|
||||||
timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
|
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
voteCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringVote(), 0)
|
voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1)
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0)
|
newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1)
|
||||||
|
|
||||||
log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound)
|
log.Debug("cs2 last round", "lr", cs2.PrivValidator.LastRound)
|
||||||
|
|
||||||
@ -591,11 +591,11 @@ func TestLockPOLUnlock(t *testing.T) {
|
|||||||
cs1, vss := simpleConsensusState(4)
|
cs1, vss := simpleConsensusState(4)
|
||||||
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
||||||
|
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
|
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
|
||||||
timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
|
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0)
|
unlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringUnlock(), 1)
|
||||||
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
||||||
|
|
||||||
// everything done from perspective of cs1
|
// everything done from perspective of cs1
|
||||||
@ -682,10 +682,10 @@ func TestLockPOLSafety1(t *testing.T) {
|
|||||||
cs1, vss := simpleConsensusState(4)
|
cs1, vss := simpleConsensusState(4)
|
||||||
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
||||||
|
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
|
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
|
||||||
timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
|
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
||||||
|
|
||||||
// start round and wait for propose and prevote
|
// start round and wait for propose and prevote
|
||||||
@ -780,7 +780,7 @@ func TestLockPOLSafety1(t *testing.T) {
|
|||||||
// we should prevote what we're locked on
|
// we should prevote what we're locked on
|
||||||
validatePrevote(t, cs1, 2, vss[0], propBlockHash)
|
validatePrevote(t, cs1, 2, vss[0], propBlockHash)
|
||||||
|
|
||||||
newStepCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRoundStep(), 0)
|
newStepCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRoundStep(), 1)
|
||||||
|
|
||||||
// add prevotes from the earlier round
|
// add prevotes from the earlier round
|
||||||
addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4)
|
addVoteToFromMany(cs1, prevotes, cs2, cs3, cs4)
|
||||||
@ -801,11 +801,11 @@ func TestLockPOLSafety2(t *testing.T) {
|
|||||||
cs1, vss := simpleConsensusState(4)
|
cs1, vss := simpleConsensusState(4)
|
||||||
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
||||||
|
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
timeoutProposeCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutPropose(), 0)
|
timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1)
|
||||||
timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
|
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
unlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringUnlock(), 0)
|
unlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringUnlock(), 1)
|
||||||
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
||||||
|
|
||||||
// the block for R0: gets polkad but we miss it
|
// the block for R0: gets polkad but we miss it
|
||||||
@ -892,9 +892,9 @@ func TestSlashingPrevotes(t *testing.T) {
|
|||||||
cs2 := vss[1]
|
cs2 := vss[1]
|
||||||
|
|
||||||
|
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0)
|
proposalCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringCompleteProposal() , 1)
|
||||||
timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0)
|
timeoutWaitCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringTimeoutWait() , 1)
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1)
|
newRoundCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringNewRound() , 1)
|
||||||
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
||||||
|
|
||||||
// start round and wait for propose and prevote
|
// start round and wait for propose and prevote
|
||||||
@ -927,9 +927,9 @@ func TestSlashingPrecommits(t *testing.T) {
|
|||||||
cs2 := vss[1]
|
cs2 := vss[1]
|
||||||
|
|
||||||
|
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringCompleteProposal() , 0)
|
proposalCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringCompleteProposal() , 1)
|
||||||
timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringTimeoutWait() , 0)
|
timeoutWaitCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringTimeoutWait() , 1)
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester",types.EventStringNewRound() , 1)
|
newRoundCh := subscribeToEvent(cs1.evsw,"tester",types.EventStringNewRound() , 1)
|
||||||
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
||||||
|
|
||||||
// start round and wait for propose and prevote
|
// start round and wait for propose and prevote
|
||||||
@ -971,10 +971,10 @@ func TestHalt1(t *testing.T) {
|
|||||||
cs1, vss := simpleConsensusState(4)
|
cs1, vss := simpleConsensusState(4)
|
||||||
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
cs2, cs3, cs4 := vss[1], vss[2], vss[3]
|
||||||
|
|
||||||
proposalCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringCompleteProposal(), 0)
|
proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1)
|
||||||
timeoutWaitCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringTimeoutWait(), 0)
|
timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1)
|
||||||
newRoundCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewRound(), 1)
|
newRoundCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewRound(), 1)
|
||||||
newBlockCh := cs1.evsw.SubscribeToEvent("tester", types.EventStringNewBlock(), 0)
|
newBlockCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringNewBlock(), 1)
|
||||||
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
voteCh := subscribeToVoter(cs1, cs1.privValidator.Address)
|
||||||
|
|
||||||
// start round and wait for propose and prevote
|
// start round and wait for propose and prevote
|
||||||
|
Reference in New Issue
Block a user