Bucky/mempool txsmap (#3512)

* mempool: resCb -> globalCb

* reqResCb takes an externalCb

* failing test for #3509

* txsMap is sync.Map

* update changelog
This commit is contained in:
Ethan Buchman
2019-03-31 07:14:18 -04:00
committed by Ismail Khoffi
parent 2233dd45bd
commit 422d04c8ba
5 changed files with 141 additions and 60 deletions

View File

@ -1,6 +1,9 @@
## v0.32.0 ## v0.31.2
** *March 30th, 2019*
This release fixes a regression from v0.31.1 where Tendermint panics under
mempool load for external ABCI apps.
### BREAKING CHANGES: ### BREAKING CHANGES:
@ -19,6 +22,8 @@
### IMPROVEMENTS: ### IMPROVEMENTS:
- [CircleCI] \#3497 Move release management to CircleCI - [circle] \#3497 Move release management to CircleCI
### BUG FIXES: ### BUG FIXES:
- [mempool] \#3512 Fix panic from concurrent access to txsMap, a regression for external ABCI apps introduced in v0.31.1

View File

@ -26,16 +26,17 @@ var _ Client = (*socketClient)(nil)
type socketClient struct { type socketClient struct {
cmn.BaseService cmn.BaseService
reqQueue chan *ReqRes addr string
flushTimer *cmn.ThrottleTimer
mustConnect bool mustConnect bool
conn net.Conn
reqQueue chan *ReqRes
flushTimer *cmn.ThrottleTimer
mtx sync.Mutex mtx sync.Mutex
addr string
conn net.Conn
err error err error
reqSent *list.List reqSent *list.List // list of requests sent, waiting for response
resCb func(*types.Request, *types.Response) // listens to all callbacks resCb func(*types.Request, *types.Response) // called on all requests, if set.
} }
@ -86,6 +87,7 @@ func (cli *socketClient) OnStop() {
cli.mtx.Lock() cli.mtx.Lock()
defer cli.mtx.Unlock() defer cli.mtx.Unlock()
if cli.conn != nil { if cli.conn != nil {
// does this really need a mutex?
cli.conn.Close() cli.conn.Close()
} }
@ -207,12 +209,15 @@ func (cli *socketClient) didRecvResponse(res *types.Response) error {
reqres.Done() // Release waiters reqres.Done() // Release waiters
cli.reqSent.Remove(next) // Pop first item from linked list cli.reqSent.Remove(next) // Pop first item from linked list
// Notify reqRes listener if set // Notify reqRes listener if set (request specific callback).
// NOTE: it is possible this callback isn't set on the reqres object.
// at this point, in which case it will be called after, when it is set.
// TODO: should we move this after the resCb call so the order is always consistent?
if cb := reqres.GetCallback(); cb != nil { if cb := reqres.GetCallback(); cb != nil {
cb(res) cb(res)
} }
// Notify client listener if set // Notify client listener if set (global callback).
if cli.resCb != nil { if cli.resCb != nil {
cli.resCb(reqres.Request, res) cli.resCb(reqres.Request, res)
} }

View File

@ -19,7 +19,7 @@ func TestCacheRemove(t *testing.T) {
for i := 0; i < numTxs; i++ { for i := 0; i < numTxs; i++ {
// probability of collision is 2**-256 // probability of collision is 2**-256
txBytes := make([]byte, 32) txBytes := make([]byte, 32)
rand.Read(txBytes) rand.Read(txBytes) // nolint: gosec
txs[i] = txBytes txs[i] = txBytes
cache.Push(txBytes) cache.Push(txBytes)
// make sure its added to both the linked list and the map // make sure its added to both the linked list and the map

View File

@ -149,6 +149,11 @@ func TxID(tx []byte) string {
return fmt.Sprintf("%X", types.Tx(tx).Hash()) return fmt.Sprintf("%X", types.Tx(tx).Hash())
} }
// txKey is the fixed length array sha256 hash used as the key in maps.
func txKey(tx types.Tx) [sha256.Size]byte {
return sha256.Sum256(tx)
}
// Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus // Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus
// round. Transaction validity is checked using the CheckTx abci message before the transaction is // round. Transaction validity is checked using the CheckTx abci message before the transaction is
// added to the pool. The Mempool uses a concurrent list structure for storing transactions that // added to the pool. The Mempool uses a concurrent list structure for storing transactions that
@ -159,23 +164,27 @@ type Mempool struct {
proxyMtx sync.Mutex proxyMtx sync.Mutex
proxyAppConn proxy.AppConnMempool proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs txs *clist.CList // concurrent linked-list of good txs
// map for quick access to txs preCheck PreCheckFunc
// Used in CheckTx to record the tx sender. postCheck PostCheckFunc
txsMap map[[sha256.Size]byte]*clist.CElement
height int64 // the last block Update()'d to // Track whether we're rechecking txs.
rechecking int32 // for re-checking filtered txs on Update() // These are not protected by a mutex and are expected to be mutated
recheckCursor *clist.CElement // next expected response // in serial (ie. by abci responses which are called in serial).
recheckEnd *clist.CElement // re-checking stops here recheckCursor *clist.CElement // next expected response
recheckEnd *clist.CElement // re-checking stops here
// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
preCheck PreCheckFunc
postCheck PostCheckFunc // Map for quick access to txs to record sender in CheckTx.
// txsMap: txKey -> CElement
txsMap sync.Map
// Atomic integers // Atomic integers
height int64 // the last block Update()'d to
// Used to check if the mempool size is bigger than the allowed limit. rechecking int32 // for re-checking filtered txs on Update()
// See TxsBytes txsBytes int64 // total size of mempool, in bytes
txsBytes int64
// Keep a cache of already-seen txs. // Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp. // This reduces the pressure on the proxyApp.
@ -203,7 +212,6 @@ func NewMempool(
config: config, config: config,
proxyAppConn: proxyAppConn, proxyAppConn: proxyAppConn,
txs: clist.New(), txs: clist.New(),
txsMap: make(map[[sha256.Size]byte]*clist.CElement),
height: height, height: height,
rechecking: 0, rechecking: 0,
recheckCursor: nil, recheckCursor: nil,
@ -216,7 +224,7 @@ func NewMempool(
} else { } else {
mempool.cache = nopTxCache{} mempool.cache = nopTxCache{}
} }
proxyAppConn.SetResponseCallback(mempool.resCb) proxyAppConn.SetResponseCallback(mempool.globalCb)
for _, option := range options { for _, option := range options {
option(mempool) option(mempool)
} }
@ -319,7 +327,7 @@ func (mem *Mempool) Flush() {
e.DetachPrev() e.DetachPrev()
} }
mem.txsMap = make(map[[sha256.Size]byte]*clist.CElement) mem.txsMap = sync.Map{}
_ = atomic.SwapInt64(&mem.txsBytes, 0) _ = atomic.SwapInt64(&mem.txsBytes, 0)
} }
@ -380,13 +388,12 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo
// CACHE // CACHE
if !mem.cache.Push(tx) { if !mem.cache.Push(tx) {
// record the sender // Record a new sender for a tx we've already seen.
e, ok := mem.txsMap[sha256.Sum256(tx)] // Note it's possible a tx is still in the cache but no longer in the mempool
// The check is needed because tx may be in cache, but not in the mempool. // (eg. after committing a block, txs are removed from mempool but not cache),
// E.g. after we've committed a block, txs are removed from the mempool, // so we only record the sender for txs still in the mempool.
// but not from the cache. if e, ok := mem.txsMap.Load(txKey(tx)); ok {
if ok { memTx := e.(*clist.CElement).Value.(*mempoolTx)
memTx := e.Value.(*mempoolTx)
if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded { if _, loaded := memTx.senders.LoadOrStore(txInfo.PeerID, true); loaded {
// TODO: consider punishing peer for dups, // TODO: consider punishing peer for dups,
// its non-trivial since invalid txs can become valid, // its non-trivial since invalid txs can become valid,
@ -416,25 +423,21 @@ func (mem *Mempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), txInfo
if err = mem.proxyAppConn.Error(); err != nil { if err = mem.proxyAppConn.Error(); err != nil {
return err return err
} }
reqRes := mem.proxyAppConn.CheckTxAsync(tx) reqRes := mem.proxyAppConn.CheckTxAsync(tx)
if cb != nil { reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID, cb))
composedCallback := func(res *abci.Response) {
mem.reqResCb(tx, txInfo.PeerID)(res)
cb(res)
}
reqRes.SetCallback(composedCallback)
} else {
reqRes.SetCallback(mem.reqResCb(tx, txInfo.PeerID))
}
return nil return nil
} }
// Global callback, which is called in the absence of the specific callback. // Global callback that will be called after every ABCI response.
// // Having a single global callback avoids needing to set a callback for each request.
// In recheckTxs because no reqResCb (specific) callback is set, this callback // However, processing the checkTx response requires the peerID (so we can track which txs we heard from who),
// will be called. // and peerID is not included in the ABCI request, so we have to set request-specific callbacks that
func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) { // include this information. If we're not in the midst of a recheck, this function will just return,
// so the request specific callback can do the work.
// When rechecking, we don't need the peerID, so the recheck callback happens here.
func (mem *Mempool) globalCb(req *abci.Request, res *abci.Response) {
if mem.recheckCursor == nil { if mem.recheckCursor == nil {
return return
} }
@ -446,35 +449,50 @@ func (mem *Mempool) resCb(req *abci.Request, res *abci.Response) {
mem.metrics.Size.Set(float64(mem.Size())) mem.metrics.Size.Set(float64(mem.Size()))
} }
// Specific callback, which allows us to incorporate local information, like // Request specific callback that should be set on individual reqRes objects
// the peer that sent us this tx, so we can avoid sending it back to the same // to incorporate local information when processing the response.
// peer. // This allows us to track the peer that sent us this tx, so we can avoid sending it back to them.
// NOTE: alternatively, we could include this information in the ABCI request itself.
//
// External callers of CheckTx, like the RPC, can also pass an externalCb through here that is called
// when all other response processing is complete.
// //
// Used in CheckTxWithInfo to record PeerID who sent us the tx. // Used in CheckTxWithInfo to record PeerID who sent us the tx.
func (mem *Mempool) reqResCb(tx []byte, peerID uint16) func(res *abci.Response) { func (mem *Mempool) reqResCb(tx []byte, peerID uint16, externalCb func(*abci.Response)) func(res *abci.Response) {
return func(res *abci.Response) { return func(res *abci.Response) {
if mem.recheckCursor != nil { if mem.recheckCursor != nil {
return // this should never happen
panic("recheck cursor is not nil in reqResCb")
} }
mem.resCbFirstTime(tx, peerID, res) mem.resCbFirstTime(tx, peerID, res)
// update metrics // update metrics
mem.metrics.Size.Set(float64(mem.Size())) mem.metrics.Size.Set(float64(mem.Size()))
// passed in by the caller of CheckTx, eg. the RPC
if externalCb != nil {
externalCb(res)
}
} }
} }
// Called from:
// - resCbFirstTime (lock not held) if tx is valid
func (mem *Mempool) addTx(memTx *mempoolTx) { func (mem *Mempool) addTx(memTx *mempoolTx) {
e := mem.txs.PushBack(memTx) e := mem.txs.PushBack(memTx)
mem.txsMap[sha256.Sum256(memTx.tx)] = e mem.txsMap.Store(txKey(memTx.tx), e)
atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx))) atomic.AddInt64(&mem.txsBytes, int64(len(memTx.tx)))
mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx))) mem.metrics.TxSizeBytes.Observe(float64(len(memTx.tx)))
} }
// Called from:
// - Update (lock held) if tx was committed
// - resCbRecheck (lock not held) if tx was invalidated
func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) { func (mem *Mempool) removeTx(tx types.Tx, elem *clist.CElement, removeFromCache bool) {
mem.txs.Remove(elem) mem.txs.Remove(elem)
elem.DetachPrev() elem.DetachPrev()
delete(mem.txsMap, sha256.Sum256(tx)) mem.txsMap.Delete(txKey(tx))
atomic.AddInt64(&mem.txsBytes, int64(-len(tx))) atomic.AddInt64(&mem.txsBytes, int64(-len(tx)))
if removeFromCache { if removeFromCache {
@ -733,7 +751,7 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
mem.recheckEnd = mem.txs.Back() mem.recheckEnd = mem.txs.Back()
// Push txs to proxyAppConn // Push txs to proxyAppConn
// NOTE: reqResCb may be called concurrently. // NOTE: globalCb may be called concurrently.
for _, tx := range txs { for _, tx := range txs {
mem.proxyAppConn.CheckTxAsync(tx) mem.proxyAppConn.CheckTxAsync(tx)
} }
@ -746,8 +764,11 @@ func (mem *Mempool) recheckTxs(txs []types.Tx) {
type mempoolTx struct { type mempoolTx struct {
height int64 // height that this tx had been validated in height int64 // height that this tx had been validated in
gasWanted int64 // amount of gas this tx states it will require gasWanted int64 // amount of gas this tx states it will require
senders sync.Map // ids of peers who've sent us this tx (as a map for quick lookups)
tx types.Tx // tx types.Tx //
// ids of peers who've sent us this tx (as a map for quick lookups).
// senders: PeerID -> bool
senders sync.Map
} }
// Height returns the height for this transaction // Height returns the height for this transaction
@ -798,7 +819,7 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
defer cache.mtx.Unlock() defer cache.mtx.Unlock()
// Use the tx hash in the cache // Use the tx hash in the cache
txHash := sha256.Sum256(tx) txHash := txKey(tx)
if moved, exists := cache.map_[txHash]; exists { if moved, exists := cache.map_[txHash]; exists {
cache.list.MoveToBack(moved) cache.list.MoveToBack(moved)
return false return false
@ -820,7 +841,7 @@ func (cache *mapTxCache) Push(tx types.Tx) bool {
// Remove removes the given tx from the cache. // Remove removes the given tx from the cache.
func (cache *mapTxCache) Remove(tx types.Tx) { func (cache *mapTxCache) Remove(tx types.Tx) {
cache.mtx.Lock() cache.mtx.Lock()
txHash := sha256.Sum256(tx) txHash := txKey(tx)
popped := cache.map_[txHash] popped := cache.map_[txHash]
delete(cache.map_, txHash) delete(cache.map_, txHash)
if popped != nil { if popped != nil {

View File

@ -6,6 +6,7 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
mrand "math/rand"
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
@ -18,6 +19,7 @@ import (
"github.com/tendermint/tendermint/abci/example/counter" "github.com/tendermint/tendermint/abci/example/counter"
"github.com/tendermint/tendermint/abci/example/kvstore" "github.com/tendermint/tendermint/abci/example/kvstore"
abciserver "github.com/tendermint/tendermint/abci/server"
abci "github.com/tendermint/tendermint/abci/types" abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
cmn "github.com/tendermint/tendermint/libs/common" cmn "github.com/tendermint/tendermint/libs/common"
@ -510,6 +512,54 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 0, mempool.TxsBytes()) assert.EqualValues(t, 0, mempool.TxsBytes())
} }
// This will non-deterministically catch some concurrency failures like
// https://github.com/tendermint/tendermint/issues/3509
// TODO: all of the tests should probably also run using the remote proxy app
// since otherwise we're not actually testing the concurrency of the mempool here!
func TestMempoolRemoteAppConcurrency(t *testing.T) {
sockPath := fmt.Sprintf("unix:///tmp/echo_%v.sock", cmn.RandStr(6))
app := kvstore.NewKVStoreApplication()
cc, server := newRemoteApp(t, sockPath, app)
defer server.Stop()
config := cfg.ResetTestRoot("mempool_test")
mempool, cleanup := newMempoolWithAppAndConfig(cc, config)
defer cleanup()
// generate small number of txs
nTxs := 10
txLen := 200
txs := make([]types.Tx, nTxs)
for i := 0; i < nTxs; i++ {
txs[i] = cmn.RandBytes(txLen)
}
// simulate a group of peers sending them over and over
N := config.Mempool.Size
maxPeers := 5
for i := 0; i < N; i++ {
peerID := mrand.Intn(maxPeers)
txNum := mrand.Intn(nTxs)
tx := txs[int(txNum)]
// this will err with ErrTxInCache many times ...
mempool.CheckTxWithInfo(tx, nil, TxInfo{PeerID: uint16(peerID)})
}
err := mempool.FlushAppConn()
require.NoError(t, err)
}
// caller must close server
func newRemoteApp(t *testing.T, addr string, app abci.Application) (clientCreator proxy.ClientCreator, server cmn.Service) {
clientCreator = proxy.NewRemoteClientCreator(addr, "socket", true)
// Start server
server = abciserver.NewSocketServer(addr, app)
server.SetLogger(log.TestingLogger().With("module", "abci-server"))
if err := server.Start(); err != nil {
t.Fatalf("Error starting socket server: %v", err.Error())
}
return clientCreator, server
}
func checksumIt(data []byte) string { func checksumIt(data []byte) string {
h := sha256.New() h := sha256.New()
h.Write(data) h.Write(data)