From 678a9a2e428b68c4a561c0af7d2e9cd192af876c Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Thu, 13 Jul 2017 15:03:19 -0400 Subject: [PATCH] TxsAvailable tests --- consensus/common_test.go | 16 +++++-- consensus/mempool_test.go | 43 ++++++++++++------ mempool/mempool_test.go | 95 ++++++++++++++++++++++++++++++++++++--- types/services.go | 2 + 4 files changed, 132 insertions(+), 24 deletions(-) diff --git a/consensus/common_test.go b/consensus/common_test.go index 103294ab..63f77b09 100644 --- a/consensus/common_test.go +++ b/consensus/common_test.go @@ -294,12 +294,22 @@ func randConsensusState(nValidators int) (*ConsensusState, []*validatorStub) { //------------------------------------------------------------------------------- func ensureNoNewStep(stepCh chan interface{}) { - timeout := time.NewTicker(ensureTimeout * time.Second) + timer := time.NewTimer(ensureTimeout * time.Second) select { - case <-timeout.C: + case <-timer.C: break case <-stepCh: - panic("We should be stuck waiting for more votes, not moving to the next step") + panic("We should be stuck waiting, not moving to the next step") + } +} + +func ensureNewStep(stepCh chan interface{}) { + timer := time.NewTimer(ensureTimeout * time.Second) + select { + case <-timer.C: + panic("We shouldnt be stuck waiting") + case <-stepCh: + break } } diff --git a/consensus/mempool_test.go b/consensus/mempool_test.go index 327ad733..9f332e15 100644 --- a/consensus/mempool_test.go +++ b/consensus/mempool_test.go @@ -15,6 +15,34 @@ func init() { config = ResetConfig("consensus_mempool_test") } +func TestTxsAvailable(t *testing.T) { + config := ResetConfig("consensus_mempool_txs_available_test") + config.Consensus.NoEmptyBlocks = true + state, privVals := randGenesisState(1, false, 10) + cs := newConsensusStateWithConfig(config, state, privVals[0], NewCounterApplication()) + cs.mempool.FireOnTxsAvailable() + height, round := cs.Height, cs.Round + newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) + startTestRound(cs, height, round) + + // we shouldnt make progress until theres a tx + ensureNoNewStep(newBlockCh) + deliverTxsRange(cs, 0, 100) + ensureNewStep(newBlockCh) +} + +func deliverTxsRange(cs *ConsensusState, start, end int) { + // Deliver some txs. + for i := start; i < end; i++ { + txBytes := make([]byte, 8) + binary.BigEndian.PutUint64(txBytes, uint64(i)) + err := cs.mempool.CheckTx(txBytes, nil) + if err != nil { + panic(Fmt("Error after CheckTx: %v", err)) + } + } +} + func TestTxConcurrentWithCommit(t *testing.T) { state, privVals := randGenesisState(1, false, 10) @@ -22,21 +50,8 @@ func TestTxConcurrentWithCommit(t *testing.T) { height, round := cs.Height, cs.Round newBlockCh := subscribeToEvent(cs.evsw, "tester", types.EventStringNewBlock(), 1) - deliverTxsRange := func(start, end int) { - // Deliver some txs. - for i := start; i < end; i++ { - txBytes := make([]byte, 8) - binary.BigEndian.PutUint64(txBytes, uint64(i)) - err := cs.mempool.CheckTx(txBytes, nil) - if err != nil { - panic(Fmt("Error after CheckTx: %v", err)) - } - // time.Sleep(time.Microsecond * time.Duration(rand.Int63n(3000))) - } - } - NTxs := 10000 - go deliverTxsRange(0, NTxs) + go deliverTxsRange(cs, 0, NTxs) startTestRound(cs, height, round) ticker := time.NewTicker(time.Second * 20) diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 6451adb2..6da9eff5 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -1,34 +1,115 @@ package mempool import ( + "crypto/rand" "encoding/binary" "testing" + "time" "github.com/tendermint/abci/example/counter" + "github.com/tendermint/abci/example/dummy" + "github.com/tendermint/tmlibs/log" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" - "github.com/tendermint/tmlibs/log" ) -func TestSerialReap(t *testing.T) { +func newMempoolWithApp(t *testing.T, cc proxy.ClientCreator) *Mempool { config := cfg.ResetTestRoot("mempool_test") - app := counter.NewCounterApplication(true) - app.SetOption("serial", "on") - cc := proxy.NewLocalClientCreator(app) appConnMem, _ := cc.NewABCIClient() appConnMem.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "mempool")) if _, err := appConnMem.Start(); err != nil { t.Fatalf("Error starting ABCI client: %v", err.Error()) } + mempool := NewMempool(config.Mempool, appConnMem) + mempool.SetLogger(log.TestingLogger()) + return mempool +} + +func ensureNoFire(t *testing.T, ch chan struct{}, timeoutMS int) { + timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) + select { + case <-ch: + t.Fatal("Expected not to fire") + case <-timer.C: + } +} + +func ensureFire(t *testing.T, ch chan struct{}, timeoutMS int) { + timer := time.NewTimer(time.Duration(timeoutMS) * time.Millisecond) + select { + case <-ch: + case <-timer.C: + t.Fatal("Expected to fire") + } +} + +func sendTxs(t *testing.T, mempool *Mempool, count int) types.Txs { + txs := make(types.Txs, count) + for i := 0; i < count; i++ { + txBytes := make([]byte, 20) + txs[i] = txBytes + rand.Read(txBytes) + err := mempool.CheckTx(txBytes, nil) + if err != nil { + t.Fatal("Error after CheckTx: %v", err) + } + } + return txs +} + +func TestTxsAvailable(t *testing.T) { + app := dummy.NewDummyApplication() + cc := proxy.NewLocalClientCreator(app) + mempool := newMempoolWithApp(t, cc) + mempool.FireOnTxsAvailable() + + timeoutMS := 500 + + // with no txs, it shouldnt fire + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch of txs, it should only fire once + txs := sendTxs(t, mempool, 100) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // call update with half the txs. + // it should fire once now for the new height + // since there are still txs left + committedTxs, txs := txs[:50], txs[50:] + mempool.Update(1, committedTxs) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch more txs. we already fired for this height so it shouldnt fire again + moreTxs := sendTxs(t, mempool, 50) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // now call update with all the txs. it should not fire as there are no txs left + committedTxs = append(txs, moreTxs...) + mempool.Update(2, committedTxs) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) + + // send a bunch more txs, it should only fire once + sendTxs(t, mempool, 100) + ensureFire(t, mempool.TxsAvailable(), timeoutMS) + ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) +} + +func TestSerialReap(t *testing.T) { + app := counter.NewCounterApplication(true) + app.SetOption("serial", "on") + cc := proxy.NewLocalClientCreator(app) + + mempool := newMempoolWithApp(t, cc) appConnCon, _ := cc.NewABCIClient() appConnCon.SetLogger(log.TestingLogger().With("module", "abci-client", "connection", "consensus")) if _, err := appConnCon.Start(); err != nil { t.Fatalf("Error starting ABCI client: %v", err.Error()) } - mempool := NewMempool(config.Mempool, appConnMem) - mempool.SetLogger(log.TestingLogger()) deliverTxsRange := func(start, end int) { // Deliver some txs. diff --git a/types/services.go b/types/services.go index 77c84a09..f68702ec 100644 --- a/types/services.go +++ b/types/services.go @@ -25,6 +25,7 @@ type Mempool interface { Flush() TxsAvailable() chan struct{} + FireOnTxsAvailable() } type MockMempool struct { @@ -38,6 +39,7 @@ func (m MockMempool) Reap(n int) Txs { return Txs{ func (m MockMempool) Update(height int, txs Txs) {} func (m MockMempool) Flush() {} func (m MockMempool) TxsAvailable() chan struct{} { return make(chan struct{}) } +func (m MockMempool) FireOnTxsAvailable() {} //------------------------------------------------------ // blockstore