mirror of
https://github.com/fluencelabs/tendermint
synced 2025-04-24 14:22:16 +00:00
mempool: make max_msg_bytes configurable (#3826)
* mempool: make max_msg_bytes configurable * apply suggestions from code review * update changelog pending * apply suggestions from code review again
This commit is contained in:
parent
c6daa48368
commit
5398420103
@ -24,5 +24,6 @@ program](https://hackerone.com/tendermint).
|
||||
- [rpc] \#2252 Add `/broadcast_evidence` endpoint to submit double signing and other types of evidence
|
||||
- [rpc] \#3818 Make `max_body_bytes` and `max_header_bytes` configurable
|
||||
- [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection
|
||||
- [mempool] \#3826 Make `max_msg_bytes` configurable
|
||||
|
||||
### BUG FIXES:
|
||||
|
@ -631,6 +631,7 @@ type MempoolConfig struct {
|
||||
Size int `mapstructure:"size"`
|
||||
MaxTxsBytes int64 `mapstructure:"max_txs_bytes"`
|
||||
CacheSize int `mapstructure:"cache_size"`
|
||||
MaxMsgBytes int `mapstructure:"max_msg_bytes"`
|
||||
}
|
||||
|
||||
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool
|
||||
@ -644,6 +645,7 @@ func DefaultMempoolConfig() *MempoolConfig {
|
||||
Size: 5000,
|
||||
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
|
||||
CacheSize: 10000,
|
||||
MaxMsgBytes: 1024 * 1024, // 1MB
|
||||
}
|
||||
}
|
||||
|
||||
@ -676,6 +678,9 @@ func (cfg *MempoolConfig) ValidateBasic() error {
|
||||
if cfg.CacheSize < 0 {
|
||||
return errors.New("cache_size can't be negative")
|
||||
}
|
||||
if cfg.MaxMsgBytes < 0 {
|
||||
return errors.New("max_msg_bytes can't be negative")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -294,6 +294,9 @@ max_txs_bytes = {{ .Mempool.MaxTxsBytes }}
|
||||
# Size of the cache (used to filter transactions we saw earlier) in transactions
|
||||
cache_size = {{ .Mempool.CacheSize }}
|
||||
|
||||
# Limit the size of TxMessage
|
||||
max_msg_bytes = {{ .Mempool.MaxMsgBytes }}
|
||||
|
||||
##### consensus configuration options #####
|
||||
[consensus]
|
||||
|
||||
|
@ -240,6 +240,9 @@ max_txs_bytes = 1073741824
|
||||
# Size of the cache (used to filter transactions we saw earlier) in transactions
|
||||
cache_size = 10000
|
||||
|
||||
# Limit the size of TxMessage
|
||||
max_msg_bytes = 1048576
|
||||
|
||||
##### consensus configuration options #####
|
||||
[consensus]
|
||||
|
||||
|
@ -220,9 +220,10 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t
|
||||
var (
|
||||
memSize = mem.Size()
|
||||
txsBytes = mem.TxsBytes()
|
||||
txSize = len(tx)
|
||||
)
|
||||
if memSize >= mem.config.Size ||
|
||||
int64(len(tx))+txsBytes > mem.config.MaxTxsBytes {
|
||||
int64(txSize)+txsBytes > mem.config.MaxTxsBytes {
|
||||
return ErrMempoolIsFull{
|
||||
memSize, mem.config.Size,
|
||||
txsBytes, mem.config.MaxTxsBytes}
|
||||
@ -231,8 +232,8 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t
|
||||
// The size of the corresponding amino-encoded TxMessage
|
||||
// can't be larger than the maxMsgSize, otherwise we can't
|
||||
// relay it to peers.
|
||||
if len(tx) > maxTxSize {
|
||||
return ErrTxTooLarge
|
||||
if max := calcMaxTxSize(mem.config.MaxMsgBytes); txSize > max {
|
||||
return ErrTxTooLarge{max, txSize}
|
||||
}
|
||||
|
||||
if mem.preCheck != nil {
|
||||
|
@ -426,6 +426,9 @@ func TestMempoolMaxMsgSize(t *testing.T) {
|
||||
mempl, cleanup := newMempoolWithApp(cc)
|
||||
defer cleanup()
|
||||
|
||||
maxMsgSize := mempl.config.MaxMsgBytes
|
||||
maxTxSize := calcMaxTxSize(mempl.config.MaxMsgBytes)
|
||||
|
||||
testCases := []struct {
|
||||
len int
|
||||
err bool
|
||||
@ -462,7 +465,7 @@ func TestMempoolMaxMsgSize(t *testing.T) {
|
||||
require.NoError(t, err, caseString)
|
||||
} else {
|
||||
require.True(t, len(encoded) > maxMsgSize, caseString)
|
||||
require.Equal(t, err, ErrTxTooLarge, caseString)
|
||||
require.Equal(t, err, ErrTxTooLarge{maxTxSize, testCase.len}, caseString)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,11 +9,18 @@ import (
|
||||
var (
|
||||
// ErrTxInCache is returned to the client if we saw tx earlier
|
||||
ErrTxInCache = errors.New("Tx already exists in cache")
|
||||
|
||||
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers
|
||||
ErrTxTooLarge = fmt.Errorf("Tx too large. Max size is %d", maxTxSize)
|
||||
)
|
||||
|
||||
// ErrTxTooLarge means the tx is too big to be sent in a message to other peers
|
||||
type ErrTxTooLarge struct {
|
||||
max int
|
||||
actual int
|
||||
}
|
||||
|
||||
func (e ErrTxTooLarge) Error() string {
|
||||
return fmt.Sprintf("Tx too large. Max size is %d, but got %d", e.max, e.actual)
|
||||
}
|
||||
|
||||
// ErrMempoolIsFull means Tendermint & an application can't handle that much load
|
||||
type ErrMempoolIsFull struct {
|
||||
numTxs int
|
||||
|
@ -19,8 +19,7 @@ import (
|
||||
const (
|
||||
MempoolChannel = byte(0x30)
|
||||
|
||||
maxMsgSize = 1048576 // 1MB TODO make it configurable
|
||||
maxTxSize = maxMsgSize - 8 // account for amino overhead of TxMessage
|
||||
aminoOverheadForTxMessage = 8
|
||||
|
||||
peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount
|
||||
|
||||
@ -156,7 +155,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) {
|
||||
// Receive implements Reactor.
|
||||
// It adds any received transactions to the mempool.
|
||||
func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) {
|
||||
msg, err := decodeMsg(msgBytes)
|
||||
msg, err := memR.decodeMsg(msgBytes)
|
||||
if err != nil {
|
||||
memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
|
||||
memR.Switch.StopPeerForError(src, err)
|
||||
@ -263,9 +262,9 @@ func RegisterMempoolMessages(cdc *amino.Codec) {
|
||||
cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil)
|
||||
}
|
||||
|
||||
func decodeMsg(bz []byte) (msg MempoolMessage, err error) {
|
||||
if len(bz) > maxMsgSize {
|
||||
return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize)
|
||||
func (memR *Reactor) decodeMsg(bz []byte) (msg MempoolMessage, err error) {
|
||||
if l := len(bz); l > memR.config.MaxMsgBytes {
|
||||
return msg, ErrTxTooLarge{memR.config.MaxMsgBytes, l}
|
||||
}
|
||||
err = cdc.UnmarshalBinaryBare(bz, &msg)
|
||||
return
|
||||
@ -282,3 +281,9 @@ type TxMessage struct {
|
||||
func (m *TxMessage) String() string {
|
||||
return fmt.Sprintf("[TxMessage %v]", m.Tx)
|
||||
}
|
||||
|
||||
// calcMaxTxSize returns the max size of Tx
|
||||
// account for amino overhead of TxMessage
|
||||
func calcMaxTxSize(maxMsgSize int) int {
|
||||
return maxMsgSize - aminoOverheadForTxMessage
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user