From 422d04c8bae80e103474d37bd5a5afd0061e8d72 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Sun, 31 Mar 2019 07:14:18 -0400 Subject: [PATCH] Bucky/mempool txsmap (#3512) * mempool: resCb -> globalCb * reqResCb takes an externalCb * failing test for #3509 * txsMap is sync.Map * update changelog --- CHANGELOG_PENDING.md | 11 +++- abci/client/socket_client.go | 21 ++++--- mempool/cache_test.go | 2 +- mempool/mempool.go | 117 +++++++++++++++++++++-------------- mempool/mempool_test.go | 50 +++++++++++++++ 5 files changed, 141 insertions(+), 60 deletions(-) diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 19954dd5..e0a294ac 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -1,6 +1,9 @@ -## v0.32.0 +## v0.31.2 -** +*March 30th, 2019* + +This release fixes a regression from v0.31.1 where Tendermint panics under +mempool load for external ABCI apps. ### BREAKING CHANGES: @@ -19,6 +22,8 @@ ### IMPROVEMENTS: -- [CircleCI] \#3497 Move release management to CircleCI +- [circle] \#3497 Move release management to CircleCI ### BUG FIXES: + +- [mempool] \#3512 Fix panic from concurrent access to txsMap, a regression for external ABCI apps introduced in v0.31.1 diff --git a/abci/client/socket_client.go b/abci/client/socket_client.go index 56267660..3b401bd3 100644 --- a/abci/client/socket_client.go +++ b/abci/client/socket_client.go @@ -26,16 +26,17 @@ var _ Client = (*socketClient)(nil) type socketClient struct { cmn.BaseService - reqQueue chan *ReqRes - flushTimer *cmn.ThrottleTimer + addr string mustConnect bool + conn net.Conn + + reqQueue chan *ReqRes + flushTimer *cmn.ThrottleTimer mtx sync.Mutex - addr string - conn net.Conn err error - reqSent *list.List - resCb func(*types.Request, *types.Response) // listens to all callbacks + reqSent *list.List // list of requests sent, waiting for response + resCb func(*types.Request, *types.Response) // called on all requests, if set. } @@ -86,6 +87,7 @@ func (cli *socketClient) OnStop() { cli.mtx.Lock() defer cli.mtx.Unlock() if cli.conn != nil { + // does this really need a mutex? cli.conn.Close() } @@ -207,12 +209,15 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error { reqres.Done() // Release waiters cli.reqSent.Remove(next) // Pop first item from linked list - // Notify reqRes listener if set + // Notify reqRes listener if set (request specific callback). + // NOTE: it is possible this callback isn't set on the reqres object. + // at this point, in which case it will be called after, when it is set. + // TODO: should we move this after the resCb call so the order is always consistent? if cb := reqres.GetCallback(); cb != nil { cb(res) } - // Notify client listener if set + // Notify client listener if set (global callback). if cli.resCb != nil { cli.resCb(reqres.Request, res) } diff --git a/mempool/cache_test.go b/mempool/cache_test.go index 26e560b6..ea9f63fd 100644 --- a/mempool/cache_test.go +++ b/mempool/cache_test.go @@ -19,7 +19,7 @@ func TestCacheRemove(t *testing.T) { for i := 0; i < numTxs; i++ { // probability of collision is 2**-256 txBytes := make([]byte, 32) - rand.Read(txBytes) + rand.Read(txBytes) // nolint: gosec txs[i] = txBytes cache.Push(txBytes) // make sure its added to both the linked list and the map diff --git a/mempool/mempool.go b/mempool/mempool.go index bd3cbf7d..a5b14466 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -149,6 +149,11 @@ func TxID(tx []byte) string { return fmt.Sprintf("%X", types.Tx(tx).Hash()) } +// txKey is the fixed length array sha256 hash used as the key in maps. +func txKey(tx types.Tx) [sha256.Size]byte { + return sha256.Sum256(tx) +} + // Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus // round. Transaction validity is checked using the CheckTx abci message before the transaction is // added to the pool. The Mempool uses a concurrent list structure for storing transactions that @@ -159,23 +164,27 @@ type Mempool struct { proxyMtx sync.Mutex proxyAppConn proxy.AppConnMempool txs *clist.CList // concurrent linked-list of good txs - // map for quick access to txs - // Used in CheckTx to record the tx sender. - txsMap map[[sha256.Size]byte]*clist.CElement - height int64 // the last block Update()'d to - rechecking int32 // for re-checking filtered txs on Update() - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here + preCheck PreCheckFunc + postCheck PostCheckFunc + + // Track whether we're rechecking txs. + // These are not protected by a mutex and are expected to be mutated + // in serial (ie. by abci responses which are called in serial). + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here + + // notify listeners (ie. consensus) when txs are available notifiedTxsAvailable bool txsAvailable chan struct{} // fires once for each height, when the mempool is not empty - preCheck PreCheckFunc - postCheck PostCheckFunc + + // Map for quick access to txs to record sender in CheckTx. + // txsMap: txKey -> CElement + txsMap sync.Map // Atomic integers - - // Used to check if the mempool size is bigger than the allowed limit. - // See TxsBytes - txsBytes int64 + height int64 // the last block Update()'d to + rechecking int32 // for re-checking filtered txs on Update() + txsBytes int64 // total size of mempool, in bytes // Keep a cache of already-seen txs. // This reduces the pressure on the proxyApp. @@ -203,7 +212,6 @@ func NewMempool( config: config, proxyAppConn: proxyAppConn, txs: clist.New(), - txsMap: make(map[[sha256.Size]byte]*clist.CElement), height: height, rechecking: 0, recheckCursor: nil, @@ -216,7 +224,7 @@ func NewMempool( } else { mempool.cache = nopTxCache{} } - proxyAppConn.SetResponseCallback(mempool.resCb) + proxyAppConn.SetResponseCallback(mempool.globalCb) for _, option := range options { option(mempool) } @@ -319,7 +327,7 @@ func (mem *Mempool) Flush() { e.DetachPrev() } - mem.txsMap = make(map[[sha256.Size]byte]*clist.CElement) + mem.txsMap = sync.Map{} _ = atomic.SwapInt64(&mem.txsBytes, 0) } @@ -380,13 +388,12 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo // CACHE if !mem.cache.Push(tx) { - // record the sender - e, ok := mem.txsMap[sha256.Sum256(tx)] - // The check is needed because tx may be in cache, but not in the mempool. - // E.g. after we've committed a block, txs are removed from the mempool, - // but not from the cache. - if ok { - memTx := e.Value.(*mempoolTx) + // Record a new sender for a tx we've already seen. + // Note it's possible a tx is still in the cache but no longer in the mempool + // (eg. after committing a block, txs are removed from mempool but not cache), + // so we only record the sender for txs still in the mempool. + if e, ok := mem.txsMap.Load(txKey(tx)); ok { + memTx := e.(*clist.CElement).Value.(*mempoolTx) if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { // TODO: consider punishing peer for dups, // its non-trivial since invalid txs can become valid, @@ -416,25 +423,21 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo if err = mem.proxyAppConn.Error(); err != nil { return err } + reqRes := mem.proxyAppConn.CheckTxAsync(tx) - if cb != nil { - composedCallback := func(res *abci.Response) { - mem.reqResCb(tx, txInfo.PeerID)(res) - cb(res) - } - reqRes.SetCallback(composedCallback) - } else { - reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID)) - } + reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb)) return nil } -// Global callback, which is called in the absence of the specific callback. -// -// In recheckTxs because no reqResCb (specific) callback is set, this callback -// will be called. -func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) { +// Global callback that will be called after every ABCI response. +// Having a single global callback avoids needing to set a callback for each request. +// However, processing the checkTx response requires the peerID (so we can track which txs we heard from who), +// and peerID is not included in the ABCI request, so we have to set request-specific callbacks that +// include this information. If we're not in the midst of a recheck, this function will just return, +// so the request specific callback can do the work. +// When rechecking, we don't need the peerID, so the recheck callback happens here. +func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) { if mem.recheckCursor == nil { return } @@ -446,35 +449,50 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) { mem.metrics.Size.Set(float64(mem.Size())) } -// Specific callback, which allows us to incorporate local information, like -// the peer that sent us this tx, so we can avoid sending it back to the same -// peer. +// Request specific callback that should be set on individual reqRes objects +// to incorporate local information when processing the response. +// This allows us to track the peer that sent us this tx, so we can avoid sending it back to them. +// NOTE: alternatively, we could include this information in the ABCI request itself. +// +// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called +// when all other response processing is complete. // // Used in CheckTxWithInfo to record PeerID who sent us the tx. -func (mem *Mempool) reqResCb(tx []byte, peerID uint16) func(res *abci.Response) { +func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) { return func(res *abci.Response) { if mem.recheckCursor != nil { - return + // this should never happen + panic("recheck cursor is not nil in reqResCb") } mem.resCbFirstTime(tx, peerID, res) // update metrics mem.metrics.Size.Set(float64(mem.Size())) + + // passed in by the caller of CheckTx, eg. the RPC + if externalCb != nil { + externalCb(res) + } } } +// Called from: +// - resCbFirstTime (lock not held) if tx is valid func (mem *Mempool) addTx(memTx *mempoolTx) { e := mem.txs.PushBack(memTx) - mem.txsMap[sha256.Sum256(memTx.tx)] = e + mem.txsMap.Store(txKey(memTx.tx), e) atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) } +// Called from: +// - Update (lock held) if tx was committed +// - resCbRecheck (lock not held) if tx was invalidated func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { mem.txs.Remove(elem) elem.DetachPrev() - delete(mem.txsMap, sha256.Sum256(tx)) + mem.txsMap.Delete(txKey(tx)) atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) if removeFromCache { @@ -733,7 +751,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) { mem.recheckEnd = mem.txs.Back() // Push txs to proxyAppConn - // NOTE: reqResCb may be called concurrently. + // NOTE: globalCb may be called concurrently. for _, tx := range txs { mem.proxyAppConn.CheckTxAsync(tx) } @@ -746,8 +764,11 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) { type mempoolTx struct { height int64 // height that this tx had been validated in gasWanted int64 // amount of gas this tx states it will require - senders sync.Map // ids of peers who've sent us this tx (as a map for quick lookups) tx types.Tx // + + // ids of peers who've sent us this tx (as a map for quick lookups). + // senders: PeerID -> bool + senders sync.Map } // Height returns the height for this transaction @@ -798,7 +819,7 @@ func (cache *mapTxCache) Push(tx types.Tx) bool { defer cache.mtx.Unlock() // Use the tx hash in the cache - txHash := sha256.Sum256(tx) + txHash := txKey(tx) if moved, exists := cache.map_[txHash]; exists { cache.list.MoveToBack(moved) return false @@ -820,7 +841,7 @@ func (cache *mapTxCache) Push(tx types.Tx) bool { // Remove removes the given tx from the cache. func (cache *mapTxCache) Remove(tx types.Tx) { cache.mtx.Lock() - txHash := sha256.Sum256(tx) + txHash := txKey(tx) popped := cache.map_[txHash] delete(cache.map_, txHash) if popped != nil { diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index dc7d595a..d5f25396 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "fmt" "io/ioutil" + mrand "math/rand" "os" "path/filepath" "testing" @@ -18,6 +19,7 @@ import ( "github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/kvstore" + abciserver "github.com/tendermint/tendermint/abci/server" abci "github.com/tendermint/tendermint/abci/types" cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tendermint/libs/common" @@ -510,6 +512,54 @@ func TestMempoolTxsBytes(t *testing.T) { assert.EqualValues(t, 0, mempool.TxsBytes()) } +// This will non-deterministically catch some concurrency failures like +// https://github.com/tendermint/tendermint/issues/3509 +// TODO: all of the tests should probably also run using the remote proxy app +// since otherwise we're not actually testing the concurrency of the mempool here! +func TestMempoolRemoteAppConcurrency(t *testing.T) { + sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", cmn.RandStr(6)) + app := kvstore.NewKVStoreApplication() + cc, server := newRemoteApp(t, sockPath, app) + defer server.Stop() + config := cfg.ResetTestRoot("mempool_test") + mempool, cleanup := newMempoolWithAppAndConfig(cc, config) + defer cleanup() + + // generate small number of txs + nTxs := 10 + txLen := 200 + txs := make([]types.Tx, nTxs) + for i := 0; i < nTxs; i++ { + txs[i] = cmn.RandBytes(txLen) + } + + // simulate a group of peers sending them over and over + N := config.Mempool.Size + maxPeers := 5 + for i := 0; i < N; i++ { + peerID := mrand.Intn(maxPeers) + txNum := mrand.Intn(nTxs) + tx := txs[int(txNum)] + + // this will err with ErrTxInCache many times ... + mempool.CheckTxWithInfo(tx, nil, TxInfo{PeerID: uint16(peerID)}) + } + err := mempool.FlushAppConn() + require.NoError(t, err) +} + +// caller must close server +func newRemoteApp(t *testing.T, addr string, app abci.Application) (clientCreator proxy.ClientCreator, server cmn.Service) { + clientCreator = proxy.NewRemoteClientCreator(addr, "socket", true) + + // Start server + server = abciserver.NewSocketServer(addr, app) + server.SetLogger(log.TestingLogger().With("module", "abci-server")) + if err := server.Start(); err != nil { + t.Fatalf("Error starting socket server: %v", err.Error()) + } + return clientCreator, server +} func checksumIt(data []byte) string { h := sha256.New() h.Write(data)