From 27909e5d2aee605a7cb562a53195b2e99d78eb8e Mon Sep 17 00:00:00 2001 From: Anton Kaliaev Date: Tue, 7 May 2019 12:25:35 +0400 Subject: [PATCH] mempool: remove only valid (Code==0) txs on Update (#3625) * mempool: remove only valid (Code==0) txs on Update so evil proposers can't drop valid txs in Commit stage. Also remove invalid (Code!=0) txs from the cache so they can be resubmitted. Fixes #3322 @rickyyangz: In the end of commit stage, we will update mempool to remove all the txs in current block. // Update mempool. err = blockExec.mempool.Update( block.Height, block.Txs, TxPreCheck(state), TxPostCheck(state), ) Assum an account has 3 transactions in the mempool, the sequences are 100, 101 and 102 separately, So an evil proposal can only package the 101 and 102 transactions into its proposal block, and leave 100 still in mempool, then the two txs will be removed from all validators' mempool when commit. So the account lost the two valid txs. @ebuchman: In the longer term we may want to do something like #2639 so we can validate txs before we commit the block. But even in this case we'd only want to run the equivalent of CheckTx, which means the DeliverTx could still fail even if the CheckTx passes depending on how the app handles the ABCI Code semantics. So more work will be required around the ABCI code. See also #2185 * add changelog entry and tests * improve changelog message * reformat code --- CHANGELOG_PENDING.md | 4 ++ mempool/cache_test.go | 3 +- mempool/clist_mempool.go | 70 +++++++++++++++++------------------ mempool/clist_mempool_test.go | 50 +++++++++++++++++++------ mempool/mempool.go | 2 +- mock/mempool.go | 1 + state/execution.go | 4 +- 7 files changed, 84 insertions(+), 50 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 3b6dffb3..d03d9811 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -48,3 +48,7 @@ * `Switch#DialPeerWithAddress` now only takes an address - [consensus] \#3067 getBeginBlockValidatorInfo loads validators from stateDB instead of state (@james-ray) - [pex] \#3603 Dial seeds when addrbook needs more addresses (@defunctzombie) +- [mempool] \#3322 Remove only valid (Code==0) txs on Update + * `Mempool#Update` and `BlockExecutor#Commit` now accept + `[]*abci.ResponseDeliverTx` - list of `DeliverTx` responses, which should + match `block.Txs` diff --git a/mempool/cache_test.go b/mempool/cache_test.go index ea9f63fd..539bf119 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "github.com/tendermint/tendermint/abci/example/kvstore" + abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/types" ) @@ -66,7 +67,7 @@ func TestCacheAfterUpdate(t *testing.T) { tx := types.Tx{byte(v)} updateTxs = append(updateTxs, tx) } - mempool.Update(int64(tcIndex), updateTxs, nil, nil) + mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil) for _, v := range tc.reAddIndices { tx := types.Tx{byte(v)} diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 2a4bacbb..c6ff475a 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -519,6 +519,7 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs { func (mem *CListMempool) Update( height int64, txs types.Txs, + deliverTxResponses []*abci.ResponseDeliverTx, preCheck PreCheckFunc, postCheck PostCheckFunc, ) error { @@ -533,20 +534,37 @@ func (mem *CListMempool) Update( mem.postCheck = postCheck } - // Add committed transactions to cache (if missing). - for _, tx := range txs { - _ = mem.cache.Push(tx) - } + for i, tx := range txs { + if deliverTxResponses[i].Code == abci.CodeTypeOK { + // Add valid committed tx to the cache (if missing). + _ = mem.cache.Push(tx) - // Remove committed transactions. - txsLeft := mem.removeTxs(txs) + // Remove valid committed tx from the mempool. + if e, ok := mem.txsMap.Load(txKey(tx)); ok { + mem.removeTx(tx, e.(*clist.CElement), false) + } + } else { + // Allow invalid transactions to be resubmitted. + mem.cache.Remove(tx) + + // Don't remove invalid tx from the mempool. + // Otherwise evil proposer can drop valid txs. + // Example: + // 100 -> 101 -> 102 + // Block, proposed by evil proposer: + // 101 -> 102 + // Mempool (if you remove txs): + // 100 + // https://github.com/tendermint/tendermint/issues/3322. + } + } // Either recheck non-committed txs to see if they became invalid // or just notify there're some txs left. - if len(txsLeft) > 0 { + if mem.Size() > 0 { if mem.config.Recheck { - mem.logger.Info("Recheck txs", "numtxs", len(txsLeft), "height", height) - mem.recheckTxs(txsLeft) + mem.logger.Info("Recheck txs", "numtxs", mem.Size(), "height", height) + mem.recheckTxs() // At this point, mem.txs are being rechecked. // mem.recheckCursor re-scans mem.txs and possibly removes some txs. // Before mem.Reap(), we should wait for mem.recheckCursor to be nil. @@ -561,42 +579,22 @@ func (mem *CListMempool) Update( return nil } -func (mem *CListMempool) removeTxs(txs types.Txs) []types.Tx { - // Build a map for faster lookups. - txsMap := make(map[string]struct{}, len(txs)) - for _, tx := range txs { - txsMap[string(tx)] = struct{}{} +func (mem *CListMempool) recheckTxs() { + if mem.Size() == 0 { + panic("recheckTxs is called, but the mempool is empty") } - txsLeft := make([]types.Tx, 0, mem.txs.Len()) - for e := mem.txs.Front(); e != nil; e = e.Next() { - memTx := e.Value.(*mempoolTx) - // Remove the tx if it's already in a block. - if _, ok := txsMap[string(memTx.tx)]; ok { - // NOTE: we don't remove committed txs from the cache. - mem.removeTx(memTx.tx, e, false) - - continue - } - txsLeft = append(txsLeft, memTx.tx) - } - return txsLeft -} - -// NOTE: pass in txs because mem.txs can mutate concurrently. -func (mem *CListMempool) recheckTxs(txs []types.Tx) { - if len(txs) == 0 { - return - } atomic.StoreInt32(&mem.rechecking, 1) mem.recheckCursor = mem.txs.Front() mem.recheckEnd = mem.txs.Back() // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. - for _, tx := range txs { - mem.proxyAppConn.CheckTxAsync(tx) + for e := mem.txs.Front(); e != nil; e = e.Next() { + memTx := e.Value.(*mempoolTx) + mem.proxyAppConn.CheckTxAsync(memTx.tx) } + mem.proxyAppConn.FlushAsync() } diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 69d4eb68..3eb4990b 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -170,22 +170,42 @@ func TestMempoolFilters(t *testing.T) { {10, PreCheckAminoMaxBytes(22), PostCheckMaxGas(0), 0}, } for tcIndex, tt := range tests { - mempool.Update(1, emptyTxArr, tt.preFilter, tt.postFilter) + mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter) checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID) require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex) mempool.Flush() } } -func TestMempoolUpdateAddsTxsToCache(t *testing.T) { +func TestMempoolUpdate(t *testing.T) { app := kvstore.NewKVStoreApplication() cc := proxy.NewLocalClientCreator(app) mempool, cleanup := newMempoolWithApp(cc) defer cleanup() - mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil) - err := mempool.CheckTx([]byte{0x01}, nil) - if assert.Error(t, err) { - assert.Equal(t, ErrTxInCache, err) + + // 1. Adds valid txs to the cache + { + mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + err := mempool.CheckTx([]byte{0x01}, nil) + if assert.Error(t, err) { + assert.Equal(t, ErrTxInCache, err) + } + } + + // 2. Removes valid txs from the mempool + { + err := mempool.CheckTx([]byte{0x02}, nil) + require.NoError(t, err) + mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil) + assert.Zero(t, mempool.Size()) + } + + // 3. Removes invalid transactions from the cache, but leaves them in the mempool (if present) + { + err := mempool.CheckTx([]byte{0x03}, nil) + require.NoError(t, err) + mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil) + assert.Equal(t, 1, mempool.Size()) } } @@ -210,7 +230,7 @@ func TestTxsAvailable(t *testing.T) { // it should fire once now for the new height // since there are still txs left committedTxs, txs := txs[:50], txs[50:] - if err := mempool.Update(1, committedTxs, nil, nil); err != nil { + if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { t.Error(err) } ensureFire(t, mempool.TxsAvailable(), timeoutMS) @@ -222,7 +242,7 @@ func TestTxsAvailable(t *testing.T) { // now call update with all the txs. it should not fire as there are no txs left committedTxs = append(txs, moreTxs...) - if err := mempool.Update(2, committedTxs, nil, nil); err != nil { + if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil { t.Error(err) } ensureNoFire(t, mempool.TxsAvailable(), timeoutMS) @@ -281,7 +301,7 @@ func TestSerialReap(t *testing.T) { binary.BigEndian.PutUint64(txBytes, uint64(i)) txs = append(txs, txBytes) } - if err := mempool.Update(0, txs, nil, nil); err != nil { + if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil { t.Error(err) } } @@ -462,7 +482,7 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 1, mempool.TxsBytes()) // 3. zero again after tx is removed by Update - mempool.Update(1, []types.Tx{[]byte{0x01}}, nil, nil) + mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil) assert.EqualValues(t, 0, mempool.TxsBytes()) // 4. zero after Flush @@ -507,7 +527,7 @@ func TestMempoolTxsBytes(t *testing.T) { require.NotEmpty(t, res2.Data) // Pretend like we committed nothing so txBytes gets rechecked and removed. - mempool.Update(1, []types.Tx{}, nil, nil) + mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil) assert.EqualValues(t, 0, mempool.TxsBytes()) } @@ -570,3 +590,11 @@ func checksumFile(p string, t *testing.T) string { require.Nil(t, err, "expecting successful read of %q", p) return checksumIt(data) } + +func abciResponses(n int, code uint32) []*abci.ResponseDeliverTx { + responses := make([]*abci.ResponseDeliverTx, 0, n) + for i := 0; i < n; i++ { + responses = append(responses, &abci.ResponseDeliverTx{Code: code}) + } + return responses +} diff --git a/mempool/mempool.go b/mempool/mempool.go index 1c1873d4..0995c734 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -43,7 +43,7 @@ type Mempool interface { // Update informs the mempool that the given txs were committed and can be discarded. // NOTE: this should be called *after* block is committed by consensus. // NOTE: unsafe; Lock/Unlock must be managed by caller - Update(blockHeight int64, blockTxs types.Txs, newPreFn PreCheckFunc, newPostFn PostCheckFunc) error + Update(blockHeight int64, blockTxs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, newPreFn PreCheckFunc, newPostFn PostCheckFunc) error // FlushAppConn flushes the mempool connection to ensure async reqResCb calls are // done. E.g. from CheckTx. diff --git a/mock/mempool.go b/mock/mempool.go index 66ad68d3..cebe156b 100644 --- a/mock/mempool.go +++ b/mock/mempool.go @@ -27,6 +27,7 @@ func (Mempool) ReapMaxTxs(n int) types.Txs { return types.Txs{} } func (Mempool) Update( _ int64, _ types.Txs, + _ []*abci.ResponseDeliverTx, _ mempl.PreCheckFunc, _ mempl.PostCheckFunc, ) error { diff --git a/state/execution.go b/state/execution.go index d872145e..7e49a9ad 100644 --- a/state/execution.go +++ b/state/execution.go @@ -156,7 +156,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b } // Lock mempool, commit app state, update mempoool. - appHash, err := blockExec.Commit(state, block) + appHash, err := blockExec.Commit(state, block, abciResponses.DeliverTx) if err != nil { return state, fmt.Errorf("Commit failed for application: %v", err) } @@ -188,6 +188,7 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b func (blockExec *BlockExecutor) Commit( state State, block *types.Block, + deliverTxResponses []*abci.ResponseDeliverTx, ) ([]byte, error) { blockExec.mempool.Lock() defer blockExec.mempool.Unlock() @@ -222,6 +223,7 @@ func (blockExec *BlockExecutor) Commit( err = blockExec.mempool.Update( block.Height, block.Txs, + deliverTxResponses, TxPreCheck(state), TxPostCheck(state), )