reap max bytes from the mempool & check transaction size

See ADR 020: Limiting txs size inside a block docs/architecture/adr-020-block-size.md

Refs #2035
This commit is contained in:
Anton Kaliaev
2018-08-08 16:03:58 +04:00
parent 7b2f7090fd
commit d73c5cbdb1
33 changed files with 545 additions and 401 deletions

View File

@@ -80,6 +80,8 @@ type Mempool struct {
recheckEnd *clist.CElement // re-checking stops here
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
// Filter mempool to only accept txs for which filter(tx) returns true.
filter func(types.Tx) bool
// Keep a cache of already-seen txs.
// This reduces the pressure on the proxyApp.
@@ -139,6 +141,14 @@ func (mem *Mempool) SetLogger(l log.Logger) {
mem.logger = l
}
// SetFilter sets a filter for mempool to only accept txs for which f(tx)
// returns true.
func (mem *Mempool) SetFilter(f func(types.Tx) bool) {
mem.proxyMtx.Lock()
mem.filter = f
mem.proxyMtx.Unlock()
}
// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) MempoolOption {
return func(mem *Mempool) { mem.metrics = metrics }
@@ -240,6 +250,10 @@ func (mem *Mempool) CheckTx(tx types.Tx, cb func(*abci.Response)) (err error) {
return ErrMempoolIsFull
}
if mem.filter != nil && !mem.filter(tx) {
return
}
// CACHE
if !mem.cache.Push(tx) {
return ErrTxInCache
@@ -367,9 +381,10 @@ func (mem *Mempool) notifyTxsAvailable() {
}
}
// Reap returns a list of transactions currently in the mempool.
// If maxTxs is -1, there is no cap on the number of returned transactions.
func (mem *Mempool) Reap(maxTxs int) types.Txs {
// ReapMaxBytes reaps transactions from the mempool up to n bytes total.
// If max is negative, there is no cap on the size of all returned
// transactions (~ all available transactions).
func (mem *Mempool) ReapMaxBytes(max int) types.Txs {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
@@ -378,19 +393,39 @@ func (mem *Mempool) Reap(maxTxs int) types.Txs {
time.Sleep(time.Millisecond * 10)
}
txs := mem.collectTxs(maxTxs)
var cur int
// TODO: we will get a performance boost if we have a good estimate of avg
// size per tx, and set the initial capacity based off of that.
// txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max/mem.avgTxSize))
txs := make([]types.Tx, 0, mem.txs.Len())
for e := mem.txs.Front(); e != nil; e = e.Next() {
memTx := e.Value.(*mempoolTx)
if max > 0 && cur+len(memTx.tx)+types.MaxAminoOverheadForTx > max {
return txs
}
cur += len(memTx.tx) + types.MaxAminoOverheadForTx
txs = append(txs, memTx.tx)
}
return txs
}
// maxTxs: -1 means uncapped, 0 means none
func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
if maxTxs == 0 {
return []types.Tx{}
} else if maxTxs < 0 {
maxTxs = mem.txs.Len()
// ReapMaxTxs reaps up to max transactions from the mempool.
// If max is negative, function panics.
func (mem *Mempool) ReapMaxTxs(max int) types.Txs {
mem.proxyMtx.Lock()
defer mem.proxyMtx.Unlock()
if max < 0 {
panic("Called ReapMaxTxs with negative max")
}
txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), maxTxs))
for e := mem.txs.Front(); e != nil && len(txs) < maxTxs; e = e.Next() {
for atomic.LoadInt32(&mem.rechecking) > 0 {
// TODO: Something better?
time.Sleep(time.Millisecond * 10)
}
txs := make([]types.Tx, 0, cmn.MinInt(mem.txs.Len(), max))
for e := mem.txs.Front(); e != nil && len(txs) <= max; e = e.Next() {
memTx := e.Value.(*mempoolTx)
txs = append(txs, memTx.tx)
}
@@ -400,7 +435,7 @@ func (mem *Mempool) collectTxs(maxTxs int) types.Txs {
// Update informs the mempool that the given txs were committed and can be discarded.
// NOTE: this should be called *after* block is committed by consensus.
// NOTE: unsafe; Lock/Unlock must be managed by caller
func (mem *Mempool) Update(height int64, txs types.Txs) error {
func (mem *Mempool) Update(height int64, txs types.Txs, filter func(types.Tx) bool) error {
// First, create a lookup map of txns in new txs.
txsMap := make(map[string]struct{}, len(txs))
for _, tx := range txs {
@@ -411,6 +446,10 @@ func (mem *Mempool) Update(height int64, txs types.Txs) error {
mem.height = height
mem.notifiedTxsAvailable = false
if filter != nil {
mem.filter = filter
}
// Remove transactions that are already in txs.
goodTxs := mem.filterTxs(txsMap)
// Recheck mempool txs if any txs were committed in the block
@@ -423,7 +462,10 @@ func (mem *Mempool) Update(height int64, txs types.Txs) error {
// mem.recheckCursor re-scans mem.txs and possibly removes some txs.
// Before mem.Reap(), we should wait for mem.recheckCursor to be nil.
}
// Update metrics
mem.metrics.Size.Set(float64(mem.Size()))
return nil
}