diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index a3c8c747..c57b6b82 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -24,6 +24,9 @@ program](https://hackerone.com/tendermint). - [rpc] \#2252 Add `/broadcast_evidence` endpoint to submit double signing and other types of evidence - [rpc] \#3818 Make `max_body_bytes` and `max_header_bytes` configurable - [p2p] \#3664 p2p/conn: reuse buffer when write/read from secret connection +- [mempool] \#3826 Make `max_msg_bytes` configurable - [blockchain] \#3561 Add early version of the new blockchain reactor, which is supposed to be more modular and testable compared to the old version. To try it, you'll have to change `version` in the config file, [here](https://github.com/tendermint/tendermint/blob/master/config/toml.go#L303) NOTE: It's not ready for a production yet. For further information, see [ADR-40](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-040-blockchain-reactor-refactor.md) & [ADR-43](https://github.com/tendermint/tendermint/blob/master/docs/architecture/adr-043-blockchain-riri-org.md) ### BUG FIXES: + +- [rpc] \#3813 Return err if page is incorrect (less than 0 or greater than total pages) diff --git a/config/config.go b/config/config.go index cefe6ba6..b00702ce 100644 --- a/config/config.go +++ b/config/config.go @@ -637,6 +637,7 @@ type MempoolConfig struct { Size int `mapstructure:"size"` MaxTxsBytes int64 `mapstructure:"max_txs_bytes"` CacheSize int `mapstructure:"cache_size"` + MaxMsgBytes int `mapstructure:"max_msg_bytes"` } // DefaultMempoolConfig returns a default configuration for the Tendermint mempool @@ -650,6 +651,7 @@ func DefaultMempoolConfig() *MempoolConfig { Size: 5000, MaxTxsBytes: 1024 * 1024 * 1024, // 1GB CacheSize: 10000, + MaxMsgBytes: 1024 * 1024, // 1MB } } @@ -682,6 +684,9 @@ func (cfg *MempoolConfig) ValidateBasic() error { if cfg.CacheSize < 0 { return errors.New("cache_size can't be negative") } + if cfg.MaxMsgBytes < 0 { + return errors.New("max_msg_bytes can't be negative") + } return nil } diff --git a/config/toml.go b/config/toml.go index 6847342e..5679a1ca 100644 --- a/config/toml.go +++ b/config/toml.go @@ -302,6 +302,9 @@ cache_size = {{ .Mempool.CacheSize }} # 2) "v1" - refactor of v0 version for better testability version = "{{ .FastSync.Version }}" +# Limit the size of TxMessage +max_msg_bytes = {{ .Mempool.MaxMsgBytes }} + ##### consensus configuration options ##### [consensus] diff --git a/docs/tendermint-core/configuration.md b/docs/tendermint-core/configuration.md index eea67766..20495814 100644 --- a/docs/tendermint-core/configuration.md +++ b/docs/tendermint-core/configuration.md @@ -248,6 +248,9 @@ cache_size = 10000 # 2) "v1" - refactor of v0 version for better testability version = "v0" +# Limit the size of TxMessage +max_msg_bytes = 1048576 + ##### consensus configuration options ##### [consensus] diff --git a/mempool/clist_mempool.go b/mempool/clist_mempool.go index 4042e9b4..81123cb6 100644 --- a/mempool/clist_mempool.go +++ b/mempool/clist_mempool.go @@ -220,9 +220,10 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t var ( memSize = mem.Size() txsBytes = mem.TxsBytes() + txSize = len(tx) ) if memSize >= mem.config.Size || - int64(len(tx))+txsBytes > mem.config.MaxTxsBytes { + int64(txSize)+txsBytes > mem.config.MaxTxsBytes { return ErrMempoolIsFull{ memSize, mem.config.Size, txsBytes, mem.config.MaxTxsBytes} @@ -231,8 +232,8 @@ func (mem *CListMempool) CheckTxWithInfo(tx types.Tx, cb func(*abci.Response), t // The size of the corresponding amino-encoded TxMessage // can't be larger than the maxMsgSize, otherwise we can't // relay it to peers. - if len(tx) > maxTxSize { - return ErrTxTooLarge + if max := calcMaxTxSize(mem.config.MaxMsgBytes); txSize > max { + return ErrTxTooLarge{max, txSize} } if mem.preCheck != nil { diff --git a/mempool/clist_mempool_test.go b/mempool/clist_mempool_test.go index 90d0ed1a..e2ebca92 100644 --- a/mempool/clist_mempool_test.go +++ b/mempool/clist_mempool_test.go @@ -426,6 +426,9 @@ func TestMempoolMaxMsgSize(t *testing.T) { mempl, cleanup := newMempoolWithApp(cc) defer cleanup() + maxMsgSize := mempl.config.MaxMsgBytes + maxTxSize := calcMaxTxSize(mempl.config.MaxMsgBytes) + testCases := []struct { len int err bool @@ -462,7 +465,7 @@ func TestMempoolMaxMsgSize(t *testing.T) { require.NoError(t, err, caseString) } else { require.True(t, len(encoded) > maxMsgSize, caseString) - require.Equal(t, err, ErrTxTooLarge, caseString) + require.Equal(t, err, ErrTxTooLarge{maxTxSize, testCase.len}, caseString) } } diff --git a/mempool/errors.go b/mempool/errors.go index ac2a9b3c..c5140bdf 100644 --- a/mempool/errors.go +++ b/mempool/errors.go @@ -9,11 +9,18 @@ import ( var ( // ErrTxInCache is returned to the client if we saw tx earlier ErrTxInCache = errors.New("Tx already exists in cache") - - // ErrTxTooLarge means the tx is too big to be sent in a message to other peers - ErrTxTooLarge = fmt.Errorf("Tx too large. Max size is %d", maxTxSize) ) +// ErrTxTooLarge means the tx is too big to be sent in a message to other peers +type ErrTxTooLarge struct { + max int + actual int +} + +func (e ErrTxTooLarge) Error() string { + return fmt.Sprintf("Tx too large. Max size is %d, but got %d", e.max, e.actual) +} + // ErrMempoolIsFull means Tendermint & an application can't handle that much load type ErrMempoolIsFull struct { numTxs int diff --git a/mempool/reactor.go b/mempool/reactor.go index 65ccd7df..0ca27340 100644 --- a/mempool/reactor.go +++ b/mempool/reactor.go @@ -19,8 +19,7 @@ import ( const ( MempoolChannel = byte(0x30) - maxMsgSize = 1048576 // 1MB TODO make it configurable - maxTxSize = maxMsgSize - 8 // account for amino overhead of TxMessage + aminoOverheadForTxMessage = 8 peerCatchupSleepIntervalMS = 100 // If peer is behind, sleep this amount @@ -156,7 +155,7 @@ func (memR *Reactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor. // It adds any received transactions to the mempool. func (memR *Reactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - msg, err := decodeMsg(msgBytes) + msg, err := memR.decodeMsg(msgBytes) if err != nil { memR.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes) memR.Switch.StopPeerForError(src, err) @@ -263,9 +262,9 @@ func RegisterMempoolMessages(cdc *amino.Codec) { cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil) } -func decodeMsg(bz []byte) (msg MempoolMessage, err error) { - if len(bz) > maxMsgSize { - return msg, fmt.Errorf("Msg exceeds max size (%d > %d)", len(bz), maxMsgSize) +func (memR *Reactor) decodeMsg(bz []byte) (msg MempoolMessage, err error) { + if l := len(bz); l > memR.config.MaxMsgBytes { + return msg, ErrTxTooLarge{memR.config.MaxMsgBytes, l} } err = cdc.UnmarshalBinaryBare(bz, &msg) return @@ -282,3 +281,9 @@ type TxMessage struct { func (m *TxMessage) String() string { return fmt.Sprintf("[TxMessage %v]", m.Tx) } + +// calcMaxTxSize returns the max size of Tx +// account for amino overhead of TxMessage +func calcMaxTxSize(maxMsgSize int) int { + return maxMsgSize - aminoOverheadForTxMessage +} diff --git a/rpc/core/pipe.go b/rpc/core/pipe.go index a0fc7b9b..9581b89d 100644 --- a/rpc/core/pipe.go +++ b/rpc/core/pipe.go @@ -1,6 +1,7 @@ package core import ( + "fmt" "time" cfg "github.com/tendermint/tendermint/config" @@ -145,19 +146,24 @@ func SetConfig(c cfg.RPCConfig) { config = c } -func validatePage(page, perPage, totalCount int) int { +func validatePage(page, perPage, totalCount int) (int, error) { if perPage < 1 { - return 1 + panic(fmt.Sprintf("zero or negative perPage: %d", perPage)) + } + + if page == 0 { + return 1, nil // default } pages := ((totalCount - 1) / perPage) + 1 - if page < 1 { - page = 1 - } else if page > pages { - page = pages + if pages == 0 { + pages = 1 // one page (even if it's empty) + } + if page < 0 || page > pages { + return 1, fmt.Errorf("page should be within [0, %d] range, given %d", pages, page) } - return page + return page, nil } func validatePerPage(perPage int) int { diff --git a/rpc/core/pipe_test.go b/rpc/core/pipe_test.go index 19ed11fc..93aff3e5 100644 --- a/rpc/core/pipe_test.go +++ b/rpc/core/pipe_test.go @@ -14,33 +14,39 @@ func TestPaginationPage(t *testing.T) { perPage int page int newPage int + expErr bool }{ - {0, 0, 1, 1}, + {0, 10, 1, 1, false}, - {0, 10, 0, 1}, - {0, 10, 1, 1}, - {0, 10, 2, 1}, + {0, 10, 0, 1, false}, + {0, 10, 1, 1, false}, + {0, 10, 2, 0, true}, - {5, 10, -1, 1}, - {5, 10, 0, 1}, - {5, 10, 1, 1}, - {5, 10, 2, 1}, - {5, 10, 2, 1}, + {5, 10, -1, 0, true}, + {5, 10, 0, 1, false}, + {5, 10, 1, 1, false}, + {5, 10, 2, 0, true}, + {5, 10, 2, 0, true}, - {5, 5, 1, 1}, - {5, 5, 2, 1}, - {5, 5, 3, 1}, + {5, 5, 1, 1, false}, + {5, 5, 2, 0, true}, + {5, 5, 3, 0, true}, - {5, 3, 2, 2}, - {5, 3, 3, 2}, + {5, 3, 2, 2, false}, + {5, 3, 3, 0, true}, - {5, 2, 2, 2}, - {5, 2, 3, 3}, - {5, 2, 4, 3}, + {5, 2, 2, 2, false}, + {5, 2, 3, 3, false}, + {5, 2, 4, 0, true}, } for _, c := range cases { - p := validatePage(c.page, c.perPage, c.totalCount) + p, err := validatePage(c.page, c.perPage, c.totalCount) + if c.expErr { + assert.Error(t, err) + continue + } + assert.Equal(t, c.newPage, p, fmt.Sprintf("%v", c)) } diff --git a/rpc/core/tx.go b/rpc/core/tx.go index 575553f8..dba457c3 100644 --- a/rpc/core/tx.go +++ b/rpc/core/tx.go @@ -202,7 +202,10 @@ func TxSearch(ctx *rpctypes.Context, query string, prove bool, page, perPage int totalCount := len(results) perPage = validatePerPage(perPage) - page = validatePage(page, perPage, totalCount) + page, err = validatePage(page, perPage, totalCount) + if err != nil { + return nil, err + } skipCount := validateSkipCount(page, perPage) apiResults := make([]*ctypes.ResultTx, cmn.MinInt(perPage, totalCount-skipCount))