diff --git a/blockchain/reactor.go b/blockchain/reactor.go index e4b63ca2..fa3c7079 100644 --- a/blockchain/reactor.go +++ b/blockchain/reactor.go @@ -28,7 +28,6 @@ const ( statusUpdateIntervalSeconds = 10 // check if we should switch to consensus reactor switchToConsensusIntervalSeconds = 1 - maxBlockchainResponseSize = types.MaxBlockSize + 2 // TODO ) type consensusReactor interface { @@ -124,7 +123,7 @@ func (bcR *BlockchainReactor) RemovePeer(peer p2p.Peer, reason interface{}) { // Receive implements Reactor by handling 4 types of messages (look below). func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) { - _, msg, err := DecodeMessage(msgBytes) + _, msg, err := DecodeMessage(msgBytes, bcR.maxMsgSize()) if err != nil { bcR.Logger.Error("Error decoding message", "err", err) return @@ -163,6 +162,12 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte) } } +// maxMsgSize returns the maximum allowable size of a +// message on the blockchain reactor. +func (bcR *BlockchainReactor) maxMsgSize() int { + return bcR.state.GenesisDoc.ConsensusParams.MaxBlockSizeBytes + 2 +} + // Handle messages from the poolReactor telling the reactor what to do. // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.) @@ -221,7 +226,7 @@ FOR_LOOP: // We need both to sync the first block. break SYNC_LOOP } - firstParts := first.MakePartSet(types.DefaultBlockPartSize) + firstParts := first.MakePartSet(bcR.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes) firstPartsHeader := firstParts.Header() // Finally, verify the first block using the second's commit // NOTE: we can probably make this more efficient, but note that calling @@ -290,11 +295,11 @@ var _ = wire.RegisterInterface( // DecodeMessage decodes BlockchainMessage. // TODO: ensure that bz is completely read. -func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) { +func DecodeMessage(bz []byte, maxSize int) (msgType byte, msg BlockchainMessage, err error) { msgType = bz[0] n := int(0) r := bytes.NewReader(bz) - msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage + msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage if err != nil && n != len(bz) { err = errors.New("DecodeMessage() had bytes left over") } diff --git a/config/config.go b/config/config.go index ec1f85ed..23da4f40 100644 --- a/config/config.go +++ b/config/config.go @@ -4,8 +4,6 @@ import ( "fmt" "path/filepath" "time" - - "github.com/tendermint/tendermint/types" // TODO: remove ) // Config defines the top level configuration for a Tendermint node @@ -320,10 +318,6 @@ type ConsensusConfig struct { CreateEmptyBlocks bool `mapstructure:"create_empty_blocks"` CreateEmptyBlocksInterval int `mapstructure:"create_empty_blocks_interval"` - // TODO: This probably shouldn't be exposed but it makes it - // easy to write tests for the wal/replay - BlockPartSize int `mapstructure:"block_part_size"` - // Reactor sleep duration parameters are in ms PeerGossipSleepDuration int `mapstructure:"peer_gossip_sleep_duration"` PeerQueryMaj23SleepDuration int `mapstructure:"peer_query_maj23_sleep_duration"` @@ -386,7 +380,6 @@ func DefaultConsensusConfig() *ConsensusConfig { MaxBlockSizeBytes: 1, // TODO CreateEmptyBlocks: true, CreateEmptyBlocksInterval: 0, - BlockPartSize: types.DefaultBlockPartSize, // TODO: we shouldnt be importing types PeerGossipSleepDuration: 100, PeerQueryMaj23SleepDuration: 2000, } diff --git a/config/consensus.go b/config/consensus.go new file mode 100644 index 00000000..5b10911d --- /dev/null +++ b/config/consensus.go @@ -0,0 +1,27 @@ +package config + +import ( + "fmt" +) + +type ConsensusParams struct { + MaxBlockSizeBytes int `json:"max_block_size_bytes"` + BlockPartSizeBytes int `json:"block_part_size_bytes"` +} + +func DefaultConsensusParams() *ConsensusParams { + return &ConsensusParams{ + MaxBlockSizeBytes: 22020096, // 21MB + BlockPartSizeBytes: 65536, // 64kB, + } +} + +func (params *ConsensusParams) Validate() error { + if params.MaxBlockSizeBytes <= 0 { + return fmt.Errorf("MaxBlockSizeBytes must be greater than 0. Got %d", params.MaxBlockSizeBytes) + } + if params.BlockPartSizeBytes <= 0 { + return fmt.Errorf("BlockPartSizeBytes must be greater than 0. Got %d", params.BlockPartSizeBytes) + } + return nil +} diff --git a/consensus/replay_test.go b/consensus/replay_test.go index 78cdaf7b..179e8696 100644 --- a/consensus/replay_test.go +++ b/consensus/replay_test.go @@ -267,8 +267,6 @@ func testReplayCrashBeforeWriteVote(t *testing.T, thisCase *testCase, lineNum in var ( NUM_BLOCKS = 6 // number of blocks in the test_data/many_blocks.cswal mempool = types.MockMempool{} - - testPartSize int ) //--------------------------------------- @@ -320,7 +318,6 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { config.Consensus.SetWalFile(walFile) privVal := types.LoadPrivValidator(config.PrivValidatorFile()) - testPartSize = config.Consensus.BlockPartSize wal, err := NewWAL(walFile, false) if err != nil { @@ -384,6 +381,7 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) { } func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) { + testPartSize := st.GenesisDoc.ConsensusParams.BlockPartSizeBytes err := st.ApplyBlock(nil, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool) if err != nil { panic(err) @@ -503,7 +501,7 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { // if its not the first one, we have a full block if blockParts != nil { var n int - block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block) + block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), 0, &n, &err).(*types.Block) blocks = append(blocks, block) } blockParts = types.NewPartSetFromHeader(*p) @@ -524,7 +522,7 @@ func makeBlockchainFromWAL(wal *WAL) ([]*types.Block, []*types.Commit, error) { } // grab the last block too var n int - block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block) + block := wire.ReadBinary(&types.Block{}, blockParts.GetReader(), 0, &n, &err).(*types.Block) blocks = append(blocks, block) return blocks, commits, nil } @@ -563,7 +561,7 @@ func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBl state := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) state.SetLogger(log.TestingLogger().With("module", "state")) - store := NewMockBlockStore(config) + store := NewMockBlockStore(config, state.GenesisDoc.ConsensusParams) return state, store } @@ -572,13 +570,14 @@ func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBl type mockBlockStore struct { config *cfg.Config + params *cfg.ConsensusParams chain []*types.Block commits []*types.Commit } // TODO: NewBlockStore(db.NewMemDB) ... -func NewMockBlockStore(config *cfg.Config) *mockBlockStore { - return &mockBlockStore{config, nil, nil} +func NewMockBlockStore(config *cfg.Config, params *cfg.ConsensusParams) *mockBlockStore { + return &mockBlockStore{config, params, nil, nil} } func (bs *mockBlockStore) Height() int { return len(bs.chain) } @@ -586,7 +585,7 @@ func (bs *mockBlockStore) LoadBlock(height int) *types.Block { return bs.chain[h func (bs *mockBlockStore) LoadBlockMeta(height int) *types.BlockMeta { block := bs.chain[height-1] return &types.BlockMeta{ - BlockID: types.BlockID{block.Hash(), block.MakePartSet(bs.config.Consensus.BlockPartSize).Header()}, + BlockID: types.BlockID{block.Hash(), block.MakePartSet(bs.params.BlockPartSizeBytes).Header()}, Header: block.Header, } } diff --git a/consensus/state.go b/consensus/state.go index 2523aac6..8a19a052 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -983,7 +983,8 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts txs := cs.mempool.Reap(cs.config.MaxBlockSizeTxs) return types.MakeBlock(cs.Height, cs.state.ChainID, txs, commit, - cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash, cs.config.BlockPartSize) + cs.state.LastBlockID, cs.state.Validators.Hash(), + cs.state.AppHash, cs.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes) } // Enter: `timeoutPropose` after entering Propose. @@ -1417,7 +1418,8 @@ func (cs *ConsensusState) addProposalBlockPart(height int, part *types.Part, ver // Added and completed! var n int var err error - cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), types.MaxBlockSize, &n, &err).(*types.Block) + cs.ProposalBlock = wire.ReadBinary(&types.Block{}, cs.ProposalBlockParts.GetReader(), + cs.state.GenesisDoc.ConsensusParams.MaxBlockSizeBytes, &n, &err).(*types.Block) // NOTE: it's possible to receive complete proposal blocks for future rounds without having the proposal cs.Logger.Info("Received complete proposal block", "height", cs.ProposalBlock.Height, "hash", cs.ProposalBlock.Hash()) if cs.Step == RoundStepPropose && cs.isProposalComplete() { diff --git a/consensus/state_test.go b/consensus/state_test.go index 81ef016b..6f804fcb 100644 --- a/consensus/state_test.go +++ b/consensus/state_test.go @@ -180,7 +180,7 @@ func TestBadProposal(t *testing.T) { height, round := cs1.Height, cs1.Round vs2 := vss[1] - partSize := config.Consensus.BlockPartSize + partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) voteCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringVote(), 1) @@ -327,7 +327,7 @@ func TestLockNoPOL(t *testing.T) { vs2 := vss[1] height := cs1.Height - partSize := config.Consensus.BlockPartSize + partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) @@ -493,7 +493,7 @@ func TestLockPOLRelock(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - partSize := config.Consensus.BlockPartSize + partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) @@ -608,7 +608,7 @@ func TestLockPOLUnlock(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - partSize := config.Consensus.BlockPartSize + partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) @@ -703,7 +703,7 @@ func TestLockPOLSafety1(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - partSize := config.Consensus.BlockPartSize + partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) @@ -824,7 +824,7 @@ func TestLockPOLSafety2(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - partSize := config.Consensus.BlockPartSize + partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) timeoutProposeCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutPropose(), 1) @@ -999,7 +999,7 @@ func TestHalt1(t *testing.T) { cs1, vss := randConsensusState(4) vs2, vs3, vs4 := vss[1], vss[2], vss[3] - partSize := config.Consensus.BlockPartSize + partSize := cs1.state.GenesisDoc.ConsensusParams.BlockPartSizeBytes proposalCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringCompleteProposal(), 1) timeoutWaitCh := subscribeToEvent(cs1.evsw, "tester", types.EventStringTimeoutWait(), 1) diff --git a/types/block.go b/types/block.go index fee62e98..c8cdf81a 100644 --- a/types/block.go +++ b/types/block.go @@ -14,11 +14,6 @@ import ( "github.com/tendermint/tmlibs/merkle" ) -const ( - MaxBlockSize = 22020096 // 21MB TODO make it configurable - DefaultBlockPartSize = 65536 // 64kB TODO: put part size in parts header? -) - // Block defines the atomic unit of a Tendermint blockchain type Block struct { *Header `json:"header"` diff --git a/types/genesis.go b/types/genesis.go index 17078b6f..b7762c57 100644 --- a/types/genesis.go +++ b/types/genesis.go @@ -10,6 +10,8 @@ import ( "github.com/tendermint/go-crypto" "github.com/tendermint/go-wire/data" cmn "github.com/tendermint/tmlibs/common" + + cfg "github.com/tendermint/tendermint/config" ) //------------------------------------------------------------ @@ -24,10 +26,11 @@ type GenesisValidator struct { // GenesisDoc defines the initial conditions for a tendermint blockchain, in particular its validator set. type GenesisDoc struct { - GenesisTime time.Time `json:"genesis_time"` - ChainID string `json:"chain_id"` - Validators []GenesisValidator `json:"validators"` - AppHash data.Bytes `json:"app_hash"` + GenesisTime time.Time `json:"genesis_time"` + ChainID string `json:"chain_id"` + ConsensusParams *cfg.ConsensusParams `json:"consensus_params"` + Validators []GenesisValidator `json:"validators"` + AppHash data.Bytes `json:"app_hash"` } // SaveAs is a utility method for saving GenensisDoc as a JSON file. @@ -56,6 +59,19 @@ func (genDoc *GenesisDoc) ValidatorHash() []byte { func GenesisDocFromJSON(jsonBlob []byte) (*GenesisDoc, error) { genDoc := GenesisDoc{} err := json.Unmarshal(jsonBlob, &genDoc) + + // validate genesis + if genDoc.ChainID == "" { + return nil, errors.Errorf("Genesis doc must include non-empty chain_id") + } + if genDoc.ConsensusParams == nil { + genDoc.ConsensusParams = cfg.DefaultConsensusParams() + } else { + if err := genDoc.ConsensusParams.Validate(); err != nil { + return nil, err + } + } + return &genDoc, err } @@ -67,10 +83,7 @@ func GenesisDocFromFile(genDocFile string) (*GenesisDoc, error) { } genDoc, err := GenesisDocFromJSON(jsonBlob) if err != nil { - return nil, errors.Wrap(err, "Error reading GenesisDoc") - } - if genDoc.ChainID == "" { - return nil, errors.Errorf("Genesis doc %v must include non-empty chain_id", genDocFile) + return nil, errors.Wrap(err, cmn.Fmt("Error reading GenesisDoc at %v", genDocFile)) } return genDoc, nil }