From f06cc6630b5fbc915ee02fafebe9e1a1ad12c061 Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Fri, 19 Jan 2018 00:57:00 -0500 Subject: [PATCH] mempool: cfg.CacheSize and expose InitWAL --- config/config.go | 18 ++++++++++++++++-- consensus/replay_test.go | 6 ++++-- mempool/mempool.go | 12 +++++------- mempool/mempool_test.go | 5 +++-- node/node.go | 1 + 5 files changed, 29 insertions(+), 13 deletions(-) diff --git a/config/config.go b/config/config.go index 77bd54d3..072606b4 100644 --- a/config/config.go +++ b/config/config.go @@ -60,9 +60,9 @@ func TestConfig() *Config { BaseConfig: TestBaseConfig(), RPC: TestRPCConfig(), P2P: TestP2PConfig(), - Mempool: DefaultMempoolConfig(), + Mempool: TestMempoolConfig(), Consensus: TestConsensusConfig(), - TxIndex: DefaultTxIndexConfig(), + TxIndex: TestTxIndexConfig(), } } @@ -313,6 +313,7 @@ type MempoolConfig struct { RecheckEmpty bool `mapstructure:"recheck_empty"` Broadcast bool `mapstructure:"broadcast"` WalPath string `mapstructure:"wal_dir"` + CacheSize int `mapstructure:"cache_size"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool @@ -322,9 +323,17 @@ func DefaultMempoolConfig() *MempoolConfig { RecheckEmpty: true, Broadcast: true, WalPath: filepath.Join(defaultDataDir, "mempool.wal"), + CacheSize: 100000, } } +// TestMempoolConfig returns a configuration for testing the Tendermint mempool +func TestMempoolConfig() *MempoolConfig { + config := DefaultMempoolConfig() + config.CacheSize = 1000 + return config +} + // WalDir returns the full path to the mempool's write-ahead log func (m *MempoolConfig) WalDir() string { return rootify(m.WalPath, m.RootDir) @@ -492,6 +501,11 @@ func DefaultTxIndexConfig() *TxIndexConfig { } } +// TestTxIndexConfig returns a default configuration for the transaction indexer. +func TestTxIndexConfig() *TxIndexConfig { + return DefaultTxIndexConfig() +} + //----------------------------------------------------------------------------- // Utils diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 24255262..a4786dde 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -82,12 +82,14 @@ func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, func sendTxs(cs *ConsensusState, ctx context.Context) { i := 0 - for { + tx := []byte{byte(i)} + for i := 0; i < 256; i++ { select { case <-ctx.Done(): return default: - cs.mempool.CheckTx([]byte{byte(i)}, nil) + tx[0] = byte(i) + cs.mempool.CheckTx(tx, nil) i++ } } diff --git a/mempool/mempool.go b/mempool/mempool.go index ccd615ac..04dbe50a 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -3,7 +3,6 @@ package mempool import ( "bytes" "container/list" - "fmt" "sync" "sync/atomic" "time" @@ -49,7 +48,7 @@ TODO: Better handle abci client errors. (make it automatically handle connection */ -const cacheSize = 100000 +var ErrTxInCache = errors.New("Tx already exists in cache") // Mempool is an ordered in-memory pool for transactions before they are proposed in a consensus // round. Transaction validity is checked using the CheckTx abci message before the transaction is @@ -92,9 +91,8 @@ func NewMempool(config *cfg.MempoolConfig, proxyAppConn proxy.AppConnMempool, he recheckCursor: nil, recheckEnd: nil, logger: log.NewNopLogger(), - cache: newTxCache(cacheSize), + cache: newTxCache(config.CacheSize), } - mempool.initWAL() proxyAppConn.SetResponseCallback(mempool.resCb) return mempool } @@ -131,7 +129,7 @@ func (mem *Mempool) CloseWAL() bool { return true } -func (mem *Mempool) initWAL() { +func (mem *Mempool) InitWAL() { walDir := mem.config.WalDir() if walDir != "" { err := cmn.EnsureDir(walDir, 0700) @@ -192,7 +190,7 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) { // CACHE if mem.cache.Exists(tx) { - return fmt.Errorf("Tx already exists in cache") + return ErrTxInCache } mem.cache.Push(tx) // END CACHE @@ -449,7 +447,7 @@ func newTxCache(cacheSize int) *txCache { // Reset resets the txCache to empty. func (cache *txCache) Reset() { cache.mtx.Lock() - cache.map_ = make(map[string]struct{}, cacheSize) + cache.map_ = make(map[string]struct{}, cache.size) cache.list.Init() cache.mtx.Unlock() } diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index 4d75cc58..6dfb5984 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -236,12 +236,13 @@ func TestMempoolCloseWAL(t *testing.T) { require.Equal(t, 0, len(m1), "no matches yet") // 3. Create the mempool - wcfg := *(cfg.DefaultMempoolConfig()) + wcfg := cfg.DefaultMempoolConfig() wcfg.RootDir = rootDir app := dummy.NewDummyApplication() cc := proxy.NewLocalClientCreator(app) appConnMem, _ := cc.NewABCIClient() - mempool := NewMempool(&wcfg, appConnMem, 10) + mempool := NewMempool(wcfg, appConnMem, 10) + mempool.InitWAL() // 4. Ensure that the directory contains the WAL file m2, err := filepath.Glob(filepath.Join(rootDir, "*")) diff --git a/node/node.go b/node/node.go index 6c816a9e..3012ed05 100644 --- a/node/node.go +++ b/node/node.go @@ -189,6 +189,7 @@ func NewNode(config *cfg.Config, // Make MempoolReactor mempoolLogger := logger.With("module", "mempool") mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) + mempool.InitWAL() // no need to have the mempool wal during tests mempool.SetLogger(mempoolLogger) mempoolReactor := mempl.NewMempoolReactor(config.Mempool, mempool) mempoolReactor.SetLogger(mempoolLogger)