From 4beac54bd9e2ae670aaf766887a779ea4f5ecb26 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 12 Jul 2017 01:02:16 -0400 Subject: [PATCH] no empty blocks --- consensus/state.go | 48 +++++++++++++++++++++++++++++++++++++++++----- mempool/mempool.go | 23 +++++++++++++++++++++- node/node.go | 1 + types/services.go | 3 +++ 4 files changed, 69 insertions(+), 6 deletions(-) diff --git a/consensus/state.go b/consensus/state.go index 6259a620..6378297a 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -10,13 +10,15 @@ import ( "time" fail "github.com/ebuchman/fail-test" + wire "github.com/tendermint/go-wire" + cmn "github.com/tendermint/tmlibs/common" + "github.com/tendermint/tmlibs/log" + cfg "github.com/tendermint/tendermint/config" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" "github.com/tendermint/tendermint/types" - cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tmlibs/log" ) //----------------------------------------------------------------------------- @@ -225,6 +227,9 @@ type ConsensusState struct { doPrevote func(height, round int) setProposal func(proposal *types.Proposal) error + // signifies that txs are available for proposal + txsAvailable chan RoundState + // closed when we finish shutting down done chan struct{} } @@ -239,6 +244,7 @@ func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppCon peerMsgQueue: make(chan msgInfo, msgQueueSize), internalMsgQueue: make(chan msgInfo, msgQueueSize), timeoutTicker: NewTimeoutTicker(), + txsAvailable: make(chan RoundState), done: make(chan struct{}), } // set function defaults (may be overwritten before calling Start) @@ -619,6 +625,8 @@ func (cs *ConsensusState) receiveRoutine(maxSteps int) { var mi msgInfo select { + case rs_ := <-cs.txsAvailable: + cs.enterPropose(rs_.Height, rs_.Round) case mi = <-cs.peerMsgQueue: cs.wal.Save(mi) // handles proposals, block parts, votes @@ -770,11 +778,41 @@ func (cs *ConsensusState) enterNewRound(height int, round int) { types.FireEventNewRound(cs.evsw, cs.RoundStateEvent()) - // Immediately go to enterPropose. - cs.enterPropose(height, round) + // Wait for txs to be available in the mempool + // before we enterPropose + go cs.waitForTxs(height, round) } -// Enter: from enterNewRound(height,round). +func (cs *ConsensusState) waitForTxs(height, round int) { + // if we're the proposer, start a heartbeat routine + // to tell other peers we're just waiting for txs (for debugging) + done := make(chan struct{}) + defer close(done) + if cs.isProposer() { + go cs.proposerHeartbeat(done) + } + + // wait for the mempool to have some txs + <-cs.mempool.TxsAvailable() + + // now we can enterPropose + cs.txsAvailable <- RoundState{Height: height, Round: round} +} + +func (cs *ConsensusState) proposerHeartbeat(done chan struct{}) { + for { + select { + case <-done: + return + default: + // TODO: broadcast heartbeat + + time.Sleep(time.Second) + } + } +} + +// Enter: from enter NewRound(height,round), once txs are in the mempool func (cs *ConsensusState) enterPropose(height int, round int) { if cs.Height != height || round < cs.Round || (cs.Round == round && RoundStepPropose <= cs.Step) { cs.Logger.Debug(cmn.Fmt("enterPropose(%v/%v): Invalid args. Current step: %v/%v/%v", height, round, cs.Height, cs.Round, cs.Step)) diff --git a/mempool/mempool.go b/mempool/mempool.go index e804f9b3..e3c9e216 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -72,6 +72,10 @@ type Mempool struct { // A log of mempool txs wal *auto.AutoFile + // fires once for each height, when the mempool is not empty + txsAvailable chan struct{} + notifiedTxsAvailable bool + logger log.Logger } @@ -88,6 +92,7 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool) *M recheckEnd: nil, logger: log.NewNopLogger(), cache: newTxCache(cacheSize), + txsAvailable: make(chan struct{}, 1), } mempool.initWAL() proxyAppConn.SetResponseCallback(mempool.resCb) @@ -215,6 +220,7 @@ func (mem *Mempool) resCbNormal(req *abci.Request, res *abci.Response) { tx: req.GetCheckTx().Tx, } mem.txs.PushBack(memTx) + mem.alertIfTxsAvailable() } else { // ignore bad transaction mem.logger.Info("Bad Transaction", "res", r) @@ -256,12 +262,25 @@ func (mem *Mempool) resCbRecheck(req *abci.Request, res *abci.Response) { // Done! atomic.StoreInt32(&mem.rechecking, 0) mem.logger.Info("Done rechecking txs") + + mem.alertIfTxsAvailable() } default: // ignore other messages } } +func (mem *Mempool) alertIfTxsAvailable() { + if !mem.notifiedTxsAvailable && mem.Size() > 0 { + mem.notifiedTxsAvailable = true + mem.txsAvailable <- struct{}{} + } +} + +func (mem *Mempool) TxsAvailable() chan struct{} { + return mem.txsAvailable +} + // Reap returns a list of transactions currently in the mempool. // If maxTxs is -1, there is no cap on the number of returned transactions. func (mem *Mempool) Reap(maxTxs int) types.Txs { @@ -307,13 +326,15 @@ func (mem *Mempool) Update(height int, txs types.Txs) { // Set height mem.height = height + mem.notifiedTxsAvailable = false + // Remove transactions that are already in txs. goodTxs := mem.filterTxs(txsMap) // Recheck mempool txs if any txs were committed in the block // NOTE/XXX: in some apps a tx could be invalidated due to EndBlock, // so we really still do need to recheck, but this is for debugging if mem.config.Recheck && (mem.config.RecheckEmpty || len(txs) > 0) { - mem.logger.Info("Recheck txs", "numtxs", len(goodTxs)) + mem.logger.Info("Recheck txs", "numtxs", len(goodTxs), "height", height) mem.recheckTxs(goodTxs) // At this point, mem.txs are being rechecked. // mem.recheckCursor re-scans mem.txs and possibly removes some txs. diff --git a/node/node.go b/node/node.go index 672e384b..51390200 100644 --- a/node/node.go +++ b/node/node.go @@ -139,6 +139,7 @@ func NewNode(config *cfg.Config, privValidator *types.PrivValidator, clientCreat mempoolLogger := logger.With("module", "mempool") mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool()) mempool.SetLogger(mempoolLogger) + mempool.Update(state.LastBlockHeight, nil) mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) mempoolReactor.SetLogger(mempoolLogger) diff --git a/types/services.go b/types/services.go index ee20487e..77c84a09 100644 --- a/types/services.go +++ b/types/services.go @@ -23,6 +23,8 @@ type Mempool interface { Reap(int) Txs Update(height int, txs Txs) Flush() + + TxsAvailable() chan struct{} } type MockMempool struct { @@ -35,6 +37,7 @@ func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil func (m MockMempool) Reap(n int) Txs { return Txs{} } func (m MockMempool) Update(height int, txs Txs) {} func (m MockMempool) Flush() {} +func (m MockMempool) TxsAvailable() chan struct{} { return make(chan struct{}) } //------------------------------------------------------ // blockstore