Merge pull request #1015 from tendermint/state_funcs

state: move methods to funcs
This commit is contained in:
Ethan Buchman 2017-12-29 10:24:22 -05:00 committed by GitHub
commit 07eeddc5e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1121 additions and 961 deletions

View File

@ -3,9 +3,7 @@
## Roadmap ## Roadmap
BREAKING CHANGES: BREAKING CHANGES:
- Upgrade the header to support better proofs on validtors, results, evidence, and possibly more
- Better support for injecting randomness - Better support for injecting randomness
- Pass evidence/voteInfo through ABCI
- Upgrade consensus for more real-time use of evidence - Upgrade consensus for more real-time use of evidence
FEATURES: FEATURES:
@ -32,6 +30,27 @@ BUG FIXES:
BREAKING CHANGES: BREAKING CHANGES:
- [p2p] enable the Peer Exchange reactor by default - [p2p] enable the Peer Exchange reactor by default
- [types] add Timestamp field to Proposal/Vote - [types] add Timestamp field to Proposal/Vote
- [types] add new fields to Header: TotalTxs, ConsensusParamsHash, LastResultsHash, EvidenceHash
- [types] add Evidence to Block
- [types] simplify ValidateBasic
- [state] updates to support changes to the header
- [state] Enforce <1/3 of validator set can change at a time
FEATURES:
- [state] Send indices of absent validators and addresses of byzantine validators in BeginBlock
- [state] Historical ConsensusParams and ABCIResponses
- [docs] Specification for the base Tendermint data structures.
- [evidence] New evidence reactor for gossiping and managing evidence
- [rpc] `/block_results?height=X` returns the DeliverTx results for a given height.
IMPROVEMENTS:
- [consensus] Better handling of corrupt WAL file
BUG FIXES:
- [lite] fix race
- [state] validate block.Header.ValidatorsHash
- [p2p] allow seed addresses to be prefixed with eg. `tcp://`
- [cmd] fix `tendermint init` to ignore files that are there and generate files that aren't.
## 0.14.0 (December 11, 2017) ## 0.14.0 (December 11, 2017)

View File

@ -4,11 +4,11 @@ import (
"bytes" "bytes"
"errors" "errors"
"reflect" "reflect"
"sync"
"time" "time"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
@ -34,29 +34,33 @@ const (
type consensusReactor interface { type consensusReactor interface {
// for when we switch from blockchain reactor and fast sync to // for when we switch from blockchain reactor and fast sync to
// the consensus machine // the consensus machine
SwitchToConsensus(*sm.State, int) SwitchToConsensus(sm.State, int)
} }
// BlockchainReactor handles long-term catchup syncing. // BlockchainReactor handles long-term catchup syncing.
type BlockchainReactor struct { type BlockchainReactor struct {
p2p.BaseReactor p2p.BaseReactor
state *sm.State mtx sync.Mutex
proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn params types.ConsensusParams
store *BlockStore
pool *BlockPool
fastSync bool
requestsCh chan BlockRequest
timeoutsCh chan string
eventBus *types.EventBus // immutable
initialState sm.State
blockExec *sm.BlockExecutor
store *BlockStore
pool *BlockPool
fastSync bool
requestsCh chan BlockRequest
timeoutsCh chan string
} }
// NewBlockchainReactor returns new reactor instance. // NewBlockchainReactor returns new reactor instance.
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor { func NewBlockchainReactor(state sm.State, blockExec *sm.BlockExecutor, store *BlockStore, fastSync bool) *BlockchainReactor {
if state.LastBlockHeight != store.Height() { if state.LastBlockHeight != store.Height() {
cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height())) cmn.PanicSanity(cmn.Fmt("state (%v) and store (%v) height mismatch", state.LastBlockHeight, store.Height()))
} }
requestsCh := make(chan BlockRequest, defaultChannelCapacity) requestsCh := make(chan BlockRequest, defaultChannelCapacity)
timeoutsCh := make(chan string, defaultChannelCapacity) timeoutsCh := make(chan string, defaultChannelCapacity)
pool := NewBlockPool( pool := NewBlockPool(
@ -65,8 +69,9 @@ func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus,
timeoutsCh, timeoutsCh,
) )
bcR := &BlockchainReactor{ bcR := &BlockchainReactor{
state: state, params: state.ConsensusParams,
proxyAppConn: proxyAppConn, initialState: state,
blockExec: blockExec,
store: store, store: store,
pool: pool, pool: pool,
fastSync: fastSync, fastSync: fastSync,
@ -183,7 +188,16 @@ func (bcR *BlockchainReactor) Receive(chID byte, src p2p.Peer, msgBytes []byte)
// maxMsgSize returns the maximum allowable size of a // maxMsgSize returns the maximum allowable size of a
// message on the blockchain reactor. // message on the blockchain reactor.
func (bcR *BlockchainReactor) maxMsgSize() int { func (bcR *BlockchainReactor) maxMsgSize() int {
return bcR.state.ConsensusParams.BlockSize.MaxBytes + 2 bcR.mtx.Lock()
defer bcR.mtx.Unlock()
return bcR.params.BlockSize.MaxBytes + 2
}
// updateConsensusParams updates the internal consensus params
func (bcR *BlockchainReactor) updateConsensusParams(params types.ConsensusParams) {
bcR.mtx.Lock()
defer bcR.mtx.Unlock()
bcR.params = params
} }
// Handle messages from the poolReactor telling the reactor what to do. // Handle messages from the poolReactor telling the reactor what to do.
@ -197,7 +211,8 @@ func (bcR *BlockchainReactor) poolRoutine() {
blocksSynced := 0 blocksSynced := 0
chainID := bcR.state.ChainID chainID := bcR.initialState.ChainID
state := bcR.initialState
lastHundred := time.Now() lastHundred := time.Now()
lastRate := 0.0 lastRate := 0.0
@ -236,7 +251,7 @@ FOR_LOOP:
bcR.pool.Stop() bcR.pool.Stop()
conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor) conR := bcR.Switch.Reactor("CONSENSUS").(consensusReactor)
conR.SwitchToConsensus(bcR.state, blocksSynced) conR.SwitchToConsensus(state, blocksSynced)
break FOR_LOOP break FOR_LOOP
} }
@ -251,14 +266,15 @@ FOR_LOOP:
// We need both to sync the first block. // We need both to sync the first block.
break SYNC_LOOP break SYNC_LOOP
} }
firstParts := first.MakePartSet(bcR.state.ConsensusParams.BlockPartSizeBytes) firstParts := first.MakePartSet(state.ConsensusParams.BlockPartSizeBytes)
firstPartsHeader := firstParts.Header() firstPartsHeader := firstParts.Header()
firstID := types.BlockID{first.Hash(), firstPartsHeader}
// Finally, verify the first block using the second's commit // Finally, verify the first block using the second's commit
// NOTE: we can probably make this more efficient, but note that calling // NOTE: we can probably make this more efficient, but note that calling
// first.Hash() doesn't verify the tx contents, so MakePartSet() is // first.Hash() doesn't verify the tx contents, so MakePartSet() is
// currently necessary. // currently necessary.
err := bcR.state.Validators.VerifyCommit( err := state.Validators.VerifyCommit(
chainID, types.BlockID{first.Hash(), firstPartsHeader}, first.Height, second.LastCommit) chainID, firstID, first.Height, second.LastCommit)
if err != nil { if err != nil {
bcR.Logger.Error("Error in validation", "err", err) bcR.Logger.Error("Error in validation", "err", err)
bcR.pool.RedoRequest(first.Height) bcR.pool.RedoRequest(first.Height)
@ -268,19 +284,20 @@ FOR_LOOP:
bcR.store.SaveBlock(first, firstParts, second.LastCommit) bcR.store.SaveBlock(first, firstParts, second.LastCommit)
// TODO: should we be firing events? need to fire NewBlock events manually ...
// NOTE: we could improve performance if we // NOTE: we could improve performance if we
// didn't make the app commit to disk every block // didn't make the app commit to disk every block
// ... but we would need a way to get the hash without it persisting // ... but we would need a way to get the hash without it persisting
err := bcR.state.ApplyBlock(bcR.eventBus, bcR.proxyAppConn, var err error
first, firstPartsHeader, state, err = bcR.blockExec.ApplyBlock(state, firstID, first)
types.MockMempool{}, types.MockEvidencePool{}) // TODO unmock!
if err != nil { if err != nil {
// TODO This is bad, are we zombie? // TODO This is bad, are we zombie?
cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err)) cmn.PanicQ(cmn.Fmt("Failed to process committed block (%d:%X): %v", first.Height, first.Hash(), err))
} }
blocksSynced += 1 blocksSynced += 1
// update the consensus params
bcR.updateConsensusParams(state.ConsensusParams)
if blocksSynced%100 == 0 { if blocksSynced%100 == 0 {
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height, bcR.Logger.Info("Fast Sync Rate", "height", bcR.pool.height,
@ -302,11 +319,6 @@ func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
return nil return nil
} }
// SetEventBus sets event bus.
func (bcR *BlockchainReactor) SetEventBus(b *types.EventBus) {
bcR.eventBus = b
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// Messages // Messages

View File

@ -10,19 +10,15 @@ import (
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p" "github.com/tendermint/tendermint/p2p"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
func makeStateAndBlockStore(logger log.Logger) (*sm.State, *BlockStore) { func makeStateAndBlockStore(logger log.Logger) (sm.State, *BlockStore) {
config := cfg.ResetTestRoot("blockchain_reactor_test") config := cfg.ResetTestRoot("blockchain_reactor_test")
blockStore := NewBlockStore(dbm.NewMemDB()) blockStore := NewBlockStore(dbm.NewMemDB())
state, _ := sm.LoadStateFromDBOrGenesisFile(dbm.NewMemDB(), config.GenesisFile())
// Get State
state, _ := sm.GetState(dbm.NewMemDB(), config.GenesisFile())
state.SetLogger(logger.With("module", "state"))
state.Save()
return state, blockStore return state, blockStore
} }
@ -31,7 +27,10 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
// Make the blockchainReactor itself // Make the blockchainReactor itself
fastSync := true fastSync := true
bcReactor := NewBlockchainReactor(state.Copy(), nil, blockStore, fastSync) var nilApp proxy.AppConnConsensus
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp, types.MockMempool{}, types.MockEvidencePool{})
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain")) bcReactor.SetLogger(logger.With("module", "blockchain"))
// Next: we need to set a switch in order for peers to be added in // Next: we need to set a switch in order for peers to be added in
@ -51,7 +50,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
func TestNoBlockMessageResponse(t *testing.T) { func TestNoBlockMessageResponse(t *testing.T) {
maxBlockHeight := int64(20) maxBlockHeight := int64(20)
bcr := newBlockchainReactor(log.NewNopLogger(), maxBlockHeight) bcr := newBlockchainReactor(log.TestingLogger(), maxBlockHeight)
bcr.Start() bcr.Start()
defer bcr.Stop() defer bcr.Stop()
@ -71,6 +70,8 @@ func TestNoBlockMessageResponse(t *testing.T) {
{100, false}, {100, false},
} }
// receive a request message from peer,
// wait to hear response
for _, tt := range tests { for _, tt := range tests {
reqBlockMsg := &bcBlockRequestMessage{tt.height} reqBlockMsg := &bcBlockRequestMessage{tt.height}
reqBlockBytes := wire.BinaryBytes(struct{ BlockchainMessage }{reqBlockMsg}) reqBlockBytes := wire.BinaryBytes(struct{ BlockchainMessage }{reqBlockMsg})
@ -104,7 +105,7 @@ func makeTxs(height int64) (txs []types.Tx) {
return txs return txs
} }
func makeBlock(height int64, state *sm.State) *types.Block { func makeBlock(height int64, state sm.State) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit)) block, _ := state.MakeBlock(height, makeTxs(height), new(types.Commit))
return block return block
} }

View File

@ -235,16 +235,16 @@ func subscribeToVoter(cs *ConsensusState, addr []byte) chan interface{} {
//------------------------------------------------------------------------------- //-------------------------------------------------------------------------------
// consensus states // consensus states
func newConsensusState(state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { func newConsensusState(state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
return newConsensusStateWithConfig(config, state, pv, app) return newConsensusStateWithConfig(config, state, pv, app)
} }
func newConsensusStateWithConfig(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState { func newConsensusStateWithConfig(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application) *ConsensusState {
blockDB := dbm.NewMemDB() blockDB := dbm.NewMemDB()
return newConsensusStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB) return newConsensusStateWithConfigAndBlockStore(thisConfig, state, pv, app, blockDB)
} }
func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState { func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.State, pv types.PrivValidator, app abci.Application, blockDB dbm.DB) *ConsensusState {
// Get BlockStore // Get BlockStore
blockStore := bc.NewBlockStore(blockDB) blockStore := bc.NewBlockStore(blockDB)
@ -264,7 +264,9 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state *sm.
evpool := types.MockEvidencePool{} evpool := types.MockEvidencePool{}
// Make ConsensusReactor // Make ConsensusReactor
cs := NewConsensusState(thisConfig.Consensus, state, proxyAppConnCon, blockStore, mempool, evpool) stateDB := dbm.NewMemDB()
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyAppConnCon, mempool, evpool)
cs := NewConsensusState(thisConfig.Consensus, state, blockExec, blockStore, mempool, evpool)
cs.SetLogger(log.TestingLogger()) cs.SetLogger(log.TestingLogger())
cs.SetPrivValidator(pv) cs.SetPrivValidator(pv)
@ -284,9 +286,7 @@ func loadPrivValidator(config *cfg.Config) *types.PrivValidatorFS {
} }
func fixedConsensusStateDummy(config *cfg.Config, logger log.Logger) *ConsensusState { func fixedConsensusStateDummy(config *cfg.Config, logger log.Logger) *ConsensusState {
stateDB := dbm.NewMemDB() state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile())
state.SetLogger(logger.With("module", "state"))
privValidator := loadPrivValidator(config) privValidator := loadPrivValidator(config)
cs := newConsensusState(state, privValidator, dummy.NewDummyApplication()) cs := newConsensusState(state, privValidator, dummy.NewDummyApplication())
cs.SetLogger(logger) cs.SetLogger(logger)
@ -354,10 +354,8 @@ func randConsensusNet(nValidators int, testName string, tickerFunc func() Timeou
css := make([]*ConsensusState, nValidators) css := make([]*ConsensusState, nValidators)
logger := consensusLogger() logger := consensusLogger()
for i := 0; i < nValidators; i++ { for i := 0; i < nValidators; i++ {
db := dbm.NewMemDB() // each state needs its own db stateDB := dbm.NewMemDB() // each state needs its own db
state, _ := sm.MakeGenesisState(db, genDoc) state, _ := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
state.SetLogger(logger.With("module", "state", "validator", i))
state.Save()
thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i))
for _, opt := range configOpts { for _, opt := range configOpts {
opt(thisConfig) opt(thisConfig)
@ -380,10 +378,8 @@ func randConsensusNetWithPeers(nValidators, nPeers int, testName string, tickerF
css := make([]*ConsensusState, nPeers) css := make([]*ConsensusState, nPeers)
logger := consensusLogger() logger := consensusLogger()
for i := 0; i < nPeers; i++ { for i := 0; i < nPeers; i++ {
db := dbm.NewMemDB() // each state needs its own db stateDB := dbm.NewMemDB() // each state needs its own db
state, _ := sm.MakeGenesisState(db, genDoc) state, _ := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
state.SetLogger(logger.With("module", "state", "validator", i))
state.Save()
thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i)) thisConfig := ResetConfig(cmn.Fmt("%s_%d", testName, i))
ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal ensureDir(path.Dir(thisConfig.Consensus.WalFile()), 0700) // dir for wal
var privVal types.PrivValidator var privVal types.PrivValidator
@ -437,12 +433,11 @@ func randGenesisDoc(numValidators int, randPower bool, minPower int64) (*types.G
}, privValidators }, privValidators
} }
func randGenesisState(numValidators int, randPower bool, minPower int64) (*sm.State, []*types.PrivValidatorFS) { func randGenesisState(numValidators int, randPower bool, minPower int64) (sm.State, []*types.PrivValidatorFS) {
genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower) genDoc, privValidators := randGenesisDoc(numValidators, randPower, minPower)
s0, _ := sm.MakeGenesisState(genDoc)
db := dbm.NewMemDB() db := dbm.NewMemDB()
s0, _ := sm.MakeGenesisState(db, genDoc) sm.SaveState(db, s0)
s0.SetLogger(log.TestingLogger().With("module", "state"))
s0.Save()
return s0, privValidators return s0, privValidators
} }

View File

@ -82,7 +82,7 @@ func (conR *ConsensusReactor) OnStop() {
// SwitchToConsensus switches from fast_sync mode to consensus mode. // SwitchToConsensus switches from fast_sync mode to consensus mode.
// It resets the state, turns off fast_sync, and starts the consensus state-machine // It resets the state, turns off fast_sync, and starts the consensus state-machine
func (conR *ConsensusReactor) SwitchToConsensus(state *sm.State, blocksSynced int) { func (conR *ConsensusReactor) SwitchToConsensus(state sm.State, blocksSynced int) {
conR.Logger.Info("SwitchToConsensus") conR.Logger.Info("SwitchToConsensus")
conR.conS.reconstructLastCommit(state) conR.conS.reconstructLastCommit(state)
// NOTE: The line below causes broadcastNewRoundStepRoutine() to // NOTE: The line below causes broadcastNewRoundStepRoutine() to

View File

@ -13,6 +13,7 @@ import (
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
//auto "github.com/tendermint/tmlibs/autofile" //auto "github.com/tendermint/tmlibs/autofile"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
@ -186,15 +187,16 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc {
// we were last and using the WAL to recover there // we were last and using the WAL to recover there
type Handshaker struct { type Handshaker struct {
state *sm.State stateDB dbm.DB
store types.BlockStore initialState sm.State
logger log.Logger store types.BlockStore
logger log.Logger
nBlocks int // number of blocks applied to the state nBlocks int // number of blocks applied to the state
} }
func NewHandshaker(state *sm.State, store types.BlockStore) *Handshaker { func NewHandshaker(stateDB dbm.DB, state sm.State, store types.BlockStore) *Handshaker {
return &Handshaker{state, store, log.NewNopLogger(), 0} return &Handshaker{stateDB, state, store, log.NewNopLogger(), 0}
} }
func (h *Handshaker) SetLogger(l log.Logger) { func (h *Handshaker) SetLogger(l log.Logger) {
@ -224,7 +226,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// TODO: check version // TODO: check version
// replay blocks up to the latest in the blockstore // replay blocks up to the latest in the blockstore
_, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) _, err = h.ReplayBlocks(h.initialState, appHash, blockHeight, proxyApp)
if err != nil { if err != nil {
return fmt.Errorf("Error on replay: %v", err) return fmt.Errorf("Error on replay: %v", err)
} }
@ -238,15 +240,15 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// Replay all blocks since appBlockHeight and ensure the result matches the current state. // Replay all blocks since appBlockHeight and ensure the result matches the current state.
// Returns the final AppHash or an error // Returns the final AppHash or an error
func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) { func (h *Handshaker) ReplayBlocks(state sm.State, appHash []byte, appBlockHeight int64, proxyApp proxy.AppConns) ([]byte, error) {
storeBlockHeight := h.store.Height() storeBlockHeight := h.store.Height()
stateBlockHeight := h.state.LastBlockHeight stateBlockHeight := state.LastBlockHeight
h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight) h.logger.Info("ABCI Replay Blocks", "appHeight", appBlockHeight, "storeHeight", storeBlockHeight, "stateHeight", stateBlockHeight)
// If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain // If appBlockHeight == 0 it means that we are at genesis and hence should send InitChain
if appBlockHeight == 0 { if appBlockHeight == 0 {
validators := types.TM2PB.Validators(h.state.Validators) validators := types.TM2PB.Validators(state.Validators)
if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil { if _, err := proxyApp.Consensus().InitChainSync(abci.RequestInitChain{validators}); err != nil {
return nil, err return nil, err
} }
@ -254,7 +256,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
// First handle edge cases and constraints on the storeBlockHeight // First handle edge cases and constraints on the storeBlockHeight
if storeBlockHeight == 0 { if storeBlockHeight == 0 {
return appHash, h.checkAppHash(appHash) return appHash, checkAppHash(state, appHash)
} else if storeBlockHeight < appBlockHeight { } else if storeBlockHeight < appBlockHeight {
// the app should never be ahead of the store (but this is under app's control) // the app should never be ahead of the store (but this is under app's control)
@ -269,6 +271,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
cmn.PanicSanity(cmn.Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1)) cmn.PanicSanity(cmn.Fmt("StoreBlockHeight (%d) > StateBlockHeight + 1 (%d)", storeBlockHeight, stateBlockHeight+1))
} }
var err error
// Now either store is equal to state, or one ahead. // Now either store is equal to state, or one ahead.
// For each, consider all cases of where the app could be, given app <= store // For each, consider all cases of where the app could be, given app <= store
if storeBlockHeight == stateBlockHeight { if storeBlockHeight == stateBlockHeight {
@ -276,11 +279,11 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
// Either the app is asking for replay, or we're all synced up. // Either the app is asking for replay, or we're all synced up.
if appBlockHeight < storeBlockHeight { if appBlockHeight < storeBlockHeight {
// the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store) // the app is behind, so replay blocks, but no need to go through WAL (state is already synced to store)
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, false) return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, false)
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We're good! // We're good!
return appHash, h.checkAppHash(appHash) return appHash, checkAppHash(state, appHash)
} }
} else if storeBlockHeight == stateBlockHeight+1 { } else if storeBlockHeight == stateBlockHeight+1 {
@ -289,7 +292,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
if appBlockHeight < stateBlockHeight { if appBlockHeight < stateBlockHeight {
// the app is further behind than it should be, so replay blocks // the app is further behind than it should be, so replay blocks
// but leave the last block to go through the WAL // but leave the last block to go through the WAL
return h.replayBlocks(proxyApp, appBlockHeight, storeBlockHeight, true) return h.replayBlocks(state, proxyApp, appBlockHeight, storeBlockHeight, true)
} else if appBlockHeight == stateBlockHeight { } else if appBlockHeight == stateBlockHeight {
// We haven't run Commit (both the state and app are one block behind), // We haven't run Commit (both the state and app are one block behind),
@ -297,17 +300,19 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
// NOTE: We could instead use the cs.WAL on cs.Start, // NOTE: We could instead use the cs.WAL on cs.Start,
// but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT // but we'd have to allow the WAL to replay a block that wrote it's ENDHEIGHT
h.logger.Info("Replay last block using real app") h.logger.Info("Replay last block using real app")
return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
return state.AppHash, err
} else if appBlockHeight == storeBlockHeight { } else if appBlockHeight == storeBlockHeight {
// We ran Commit, but didn't save the state, so replayBlock with mock app // We ran Commit, but didn't save the state, so replayBlock with mock app
abciResponses, err := h.state.LoadABCIResponses(storeBlockHeight) abciResponses, err := sm.LoadABCIResponses(h.stateDB, storeBlockHeight)
if err != nil { if err != nil {
return nil, err return nil, err
} }
mockApp := newMockProxyApp(appHash, abciResponses) mockApp := newMockProxyApp(appHash, abciResponses)
h.logger.Info("Replay last block using mock app") h.logger.Info("Replay last block using mock app")
return h.replayBlock(storeBlockHeight, mockApp) state, err = h.replayBlock(state, storeBlockHeight, mockApp)
return state.AppHash, err
} }
} }
@ -316,7 +321,7 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int64, proxyApp
return nil, nil return nil, nil
} }
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) { func (h *Handshaker) replayBlocks(state sm.State, proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int64, mutateState bool) ([]byte, error) {
// App is further behind than it should be, so we need to replay blocks. // App is further behind than it should be, so we need to replay blocks.
// We replay all blocks from appBlockHeight+1. // We replay all blocks from appBlockHeight+1.
// //
@ -336,7 +341,7 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
for i := appBlockHeight + 1; i <= finalBlock; i++ { for i := appBlockHeight + 1; i <= finalBlock; i++ {
h.logger.Info("Applying block", "height", i) h.logger.Info("Applying block", "height", i)
block := h.store.LoadBlock(i) block := h.store.LoadBlock(i)
appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger, h.state.LastValidators) appHash, err = sm.ExecCommitBlock(proxyApp.Consensus(), block, h.logger)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -346,33 +351,37 @@ func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, store
if mutateState { if mutateState {
// sync the final block // sync the final block
return h.replayBlock(storeBlockHeight, proxyApp.Consensus()) state, err = h.replayBlock(state, storeBlockHeight, proxyApp.Consensus())
if err != nil {
return nil, err
}
appHash = state.AppHash
} }
return appHash, h.checkAppHash(appHash) return appHash, checkAppHash(state, appHash)
} }
// ApplyBlock on the proxyApp with the last block. // ApplyBlock on the proxyApp with the last block.
func (h *Handshaker) replayBlock(height int64, proxyApp proxy.AppConnConsensus) ([]byte, error) { func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.AppConnConsensus) (sm.State, error) {
mempool := types.MockMempool{}
evpool := types.MockEvidencePool{}
block := h.store.LoadBlock(height) block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height) meta := h.store.LoadBlockMeta(height)
if err := h.state.ApplyBlock(types.NopEventBus{}, proxyApp, blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, types.MockMempool{}, types.MockEvidencePool{})
block, meta.BlockID.PartsHeader, mempool, evpool); err != nil {
return nil, err var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block)
if err != nil {
return sm.State{}, err
} }
h.nBlocks += 1 h.nBlocks += 1
return h.state.AppHash, nil return state, nil
} }
func (h *Handshaker) checkAppHash(appHash []byte) error { func checkAppHash(state sm.State, appHash []byte) error {
if !bytes.Equal(h.state.AppHash, appHash) { if !bytes.Equal(state.AppHash, appHash) {
panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash).Error()) panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, state.AppHash).Error())
} }
return nil return nil
} }

View File

@ -18,6 +18,7 @@ import (
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
) )
const ( const (
@ -104,11 +105,11 @@ type playback struct {
count int // how many lines/msgs into the file are we count int // how many lines/msgs into the file are we
// replays can be reset to beginning // replays can be reset to beginning
fileName string // so we can close/reopen the file fileName string // so we can close/reopen the file
genesisState *sm.State // so the replay session knows where to restart from genesisState sm.State // so the replay session knows where to restart from
} }
func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState *sm.State) *playback { func newPlayback(fileName string, fp *os.File, cs *ConsensusState, genState sm.State) *playback {
return &playback{ return &playback{
cs: cs, cs: cs,
fp: fp, fp: fp,
@ -123,7 +124,7 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
pb.cs.Stop() pb.cs.Stop()
pb.cs.Wait() pb.cs.Wait()
newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.proxyAppConn, newCS := NewConsensusState(pb.cs.config, pb.genesisState.Copy(), pb.cs.blockExec,
pb.cs.blockStore, pb.cs.mempool, pb.cs.evpool) pb.cs.blockStore, pb.cs.mempool, pb.cs.evpool)
newCS.SetEventBus(pb.cs.eventBus) newCS.SetEventBus(pb.cs.eventBus)
newCS.startForReplay() newCS.startForReplay()
@ -285,14 +286,14 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
// Get State // Get State
stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir()) stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir())
state, err := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) state, err := sm.MakeGenesisStateFromFile(config.GenesisFile())
if err != nil { if err != nil {
cmn.Exit(err.Error()) cmn.Exit(err.Error())
} }
// Create proxyAppConn connection (consensus, mempool, query) // Create proxyAppConn connection (consensus, mempool, query)
clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir()) clientCreator := proxy.DefaultClientCreator(config.ProxyApp, config.ABCI, config.DBDir())
proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(state, blockStore)) proxyApp := proxy.NewAppConns(clientCreator, NewHandshaker(stateDB, state, blockStore))
err = proxyApp.Start() err = proxyApp.Start()
if err != nil { if err != nil {
cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err)) cmn.Exit(cmn.Fmt("Error starting proxy app conns: %v", err))
@ -303,8 +304,11 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err))
} }
consensusState := NewConsensusState(csConfig, state.Copy(), proxyApp.Consensus(), mempool, evpool := types.MockMempool{}, types.MockEvidencePool{}
blockStore, types.MockMempool{}, types.MockEvidencePool{}) blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewConsensusState(csConfig, state.Copy(), blockExec,
blockStore, mempool, evpool)
consensusState.SetEventBus(eventBus) consensusState.SetEventBus(eventBus)
return consensusState return consensusState

View File

@ -53,8 +53,7 @@ func init() {
func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) { func startNewConsensusStateAndWaitForBlock(t *testing.T, lastBlockHeight int64, blockDB dbm.DB, stateDB dbm.DB) {
logger := log.TestingLogger() logger := log.TestingLogger()
state, _ := sm.GetState(stateDB, consensusReplayConfig.GenesisFile()) state, _ := sm.LoadStateFromDBOrGenesisFile(stateDB, consensusReplayConfig.GenesisFile())
state.SetLogger(logger.With("module", "state"))
privValidator := loadPrivValidator(consensusReplayConfig) privValidator := loadPrivValidator(consensusReplayConfig)
cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB)
cs.SetLogger(logger) cs.SetLogger(logger)
@ -98,22 +97,22 @@ func sendTxs(cs *ConsensusState, ctx context.Context) {
func TestWALCrash(t *testing.T) { func TestWALCrash(t *testing.T) {
testCases := []struct { testCases := []struct {
name string name string
initFn func(*ConsensusState, context.Context) initFn func(dbm.DB, *ConsensusState, context.Context)
heightToStop int64 heightToStop int64
}{ }{
{"empty block", {"empty block",
func(cs *ConsensusState, ctx context.Context) {}, func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {},
1}, 1},
{"block with a smaller part size", {"block with a smaller part size",
func(cs *ConsensusState, ctx context.Context) { func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
// XXX: is there a better way to change BlockPartSizeBytes? // XXX: is there a better way to change BlockPartSizeBytes?
cs.state.ConsensusParams.BlockPartSizeBytes = 512 cs.state.ConsensusParams.BlockPartSizeBytes = 512
cs.state.Save() sm.SaveState(stateDB, cs.state)
go sendTxs(cs, ctx) go sendTxs(cs, ctx)
}, },
1}, 1},
{"many non-empty blocks", {"many non-empty blocks",
func(cs *ConsensusState, ctx context.Context) { func(stateDB dbm.DB, cs *ConsensusState, ctx context.Context) {
go sendTxs(cs, ctx) go sendTxs(cs, ctx)
}, },
3}, 3},
@ -126,7 +125,7 @@ func TestWALCrash(t *testing.T) {
} }
} }
func crashWALandCheckLiveness(t *testing.T, initFn func(*ConsensusState, context.Context), heightToStop int64) { func crashWALandCheckLiveness(t *testing.T, initFn func(dbm.DB, *ConsensusState, context.Context), heightToStop int64) {
walPaniced := make(chan error) walPaniced := make(chan error)
crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop} crashingWal := &crashingWAL{panicCh: walPaniced, heightToStop: heightToStop}
@ -139,8 +138,7 @@ LOOP:
// create consensus state from a clean slate // create consensus state from a clean slate
logger := log.NewNopLogger() logger := log.NewNopLogger()
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()
state, _ := sm.MakeGenesisStateFromFile(stateDB, consensusReplayConfig.GenesisFile()) state, _ := sm.MakeGenesisStateFromFile(consensusReplayConfig.GenesisFile())
state.SetLogger(logger.With("module", "state"))
privValidator := loadPrivValidator(consensusReplayConfig) privValidator := loadPrivValidator(consensusReplayConfig)
blockDB := dbm.NewMemDB() blockDB := dbm.NewMemDB()
cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB) cs := newConsensusStateWithConfigAndBlockStore(consensusReplayConfig, state, privValidator, dummy.NewDummyApplication(), blockDB)
@ -148,7 +146,7 @@ LOOP:
// start sending transactions // start sending transactions
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
initFn(cs, ctx) initFn(stateDB, cs, ctx)
// clean up WAL file from the previous iteration // clean up WAL file from the previous iteration
walFile := cs.config.WalFile() walFile := cs.config.WalFile()
@ -344,12 +342,13 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
t.Fatalf(err.Error()) t.Fatalf(err.Error())
} }
state, store := stateAndStore(config, privVal.GetPubKey()) stateDB, state, store := stateAndStore(config, privVal.GetPubKey())
store.chain = chain store.chain = chain
store.commits = commits store.commits = commits
// run the chain through state.ApplyBlock to build up the tendermint state // run the chain through state.ApplyBlock to build up the tendermint state
latestAppHash := buildTMStateFromChain(config, state, chain, mode) state = buildTMStateFromChain(config, stateDB, state, chain, mode)
latestAppHash := state.AppHash
// make a new client creator // make a new client creator
dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "2")) dummyApp := dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "2"))
@ -358,12 +357,12 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
// run nBlocks against a new client to build up the app state. // run nBlocks against a new client to build up the app state.
// use a throwaway tendermint state // use a throwaway tendermint state
proxyApp := proxy.NewAppConns(clientCreator2, nil) proxyApp := proxy.NewAppConns(clientCreator2, nil)
state, _ := stateAndStore(config, privVal.GetPubKey()) stateDB, state, _ := stateAndStore(config, privVal.GetPubKey())
buildAppStateFromChain(proxyApp, state, chain, nBlocks, mode) buildAppStateFromChain(proxyApp, stateDB, state, chain, nBlocks, mode)
} }
// now start the app using the handshake - it should sync // now start the app using the handshake - it should sync
handshaker := NewHandshaker(state, store) handshaker := NewHandshaker(stateDB, state, store)
proxyApp := proxy.NewAppConns(clientCreator2, handshaker) proxyApp := proxy.NewAppConns(clientCreator2, handshaker)
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
t.Fatalf("Error starting proxy app connections: %v", err) t.Fatalf("Error starting proxy app connections: %v", err)
@ -393,16 +392,20 @@ func testHandshakeReplay(t *testing.T, nBlocks int, mode uint) {
} }
} }
func applyBlock(st *sm.State, blk *types.Block, proxyApp proxy.AppConns) { func applyBlock(stateDB dbm.DB, st sm.State, blk *types.Block, proxyApp proxy.AppConns) sm.State {
testPartSize := st.ConsensusParams.BlockPartSizeBytes testPartSize := st.ConsensusParams.BlockPartSizeBytes
err := st.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(), blk, blk.MakePartSet(testPartSize).Header(), mempool, evpool) blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
blkID := types.BlockID{blk.Hash(), blk.MakePartSet(testPartSize).Header()}
newState, err := blockExec.ApplyBlock(st, blkID, blk)
if err != nil { if err != nil {
panic(err) panic(err)
} }
return newState
} }
func buildAppStateFromChain(proxyApp proxy.AppConns, func buildAppStateFromChain(proxyApp proxy.AppConns, stateDB dbm.DB,
state *sm.State, chain []*types.Block, nBlocks int, mode uint) { state sm.State, chain []*types.Block, nBlocks int, mode uint) {
// start a new app without handshake, play nBlocks blocks // start a new app without handshake, play nBlocks blocks
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
panic(err) panic(err)
@ -418,24 +421,24 @@ func buildAppStateFromChain(proxyApp proxy.AppConns,
case 0: case 0:
for i := 0; i < nBlocks; i++ { for i := 0; i < nBlocks; i++ {
block := chain[i] block := chain[i]
applyBlock(state, block, proxyApp) state = applyBlock(stateDB, state, block, proxyApp)
} }
case 1, 2: case 1, 2:
for i := 0; i < nBlocks-1; i++ { for i := 0; i < nBlocks-1; i++ {
block := chain[i] block := chain[i]
applyBlock(state, block, proxyApp) state = applyBlock(stateDB, state, block, proxyApp)
} }
if mode == 2 { if mode == 2 {
// update the dummy height and apphash // update the dummy height and apphash
// as if we ran commit but not // as if we ran commit but not
applyBlock(state, chain[nBlocks-1], proxyApp) state = applyBlock(stateDB, state, chain[nBlocks-1], proxyApp)
} }
} }
} }
func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.Block, mode uint) []byte { func buildTMStateFromChain(config *cfg.Config, stateDB dbm.DB, state sm.State, chain []*types.Block, mode uint) sm.State {
// run the whole chain against this client to build up the tendermint state // run the whole chain against this client to build up the tendermint state
clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1"))) clientCreator := proxy.NewLocalClientCreator(dummy.NewPersistentDummyApplication(path.Join(config.DBDir(), "1")))
proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock)) proxyApp := proxy.NewAppConns(clientCreator, nil) // sm.NewHandshaker(config, state, store, ReplayLastBlock))
@ -449,31 +452,26 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B
panic(err) panic(err)
} }
var latestAppHash []byte
switch mode { switch mode {
case 0: case 0:
// sync right up // sync right up
for _, block := range chain { for _, block := range chain {
applyBlock(state, block, proxyApp) state = applyBlock(stateDB, state, block, proxyApp)
} }
latestAppHash = state.AppHash
case 1, 2: case 1, 2:
// sync up to the penultimate as if we stored the block. // sync up to the penultimate as if we stored the block.
// whether we commit or not depends on the appHash // whether we commit or not depends on the appHash
for _, block := range chain[:len(chain)-1] { for _, block := range chain[:len(chain)-1] {
applyBlock(state, block, proxyApp) state = applyBlock(stateDB, state, block, proxyApp)
} }
// apply the final block to a state copy so we can // apply the final block to a state copy so we can
// get the right next appHash but keep the state back // get the right next appHash but keep the state back
stateCopy := state.Copy() applyBlock(stateDB, state, chain[len(chain)-1], proxyApp)
applyBlock(stateCopy, chain[len(chain)-1], proxyApp)
latestAppHash = stateCopy.AppHash
} }
return latestAppHash return state
} }
//-------------------------- //--------------------------
@ -587,13 +585,11 @@ func readPieceFromWAL(msg *TimedWALMessage) interface{} {
} }
// fresh state and mock store // fresh state and mock store
func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (*sm.State, *mockBlockStore) { func stateAndStore(config *cfg.Config, pubKey crypto.PubKey) (dbm.DB, sm.State, *mockBlockStore) {
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()
state, _ := sm.MakeGenesisStateFromFile(stateDB, config.GenesisFile()) state, _ := sm.MakeGenesisStateFromFile(config.GenesisFile())
state.SetLogger(log.TestingLogger().With("module", "state"))
store := NewMockBlockStore(config, state.ConsensusParams) store := NewMockBlockStore(config, state.ConsensusParams)
return state, store return stateDB, state, store
} }
//---------------------------------- //----------------------------------

View File

@ -17,7 +17,6 @@ import (
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
cstypes "github.com/tendermint/tendermint/consensus/types" cstypes "github.com/tendermint/tendermint/consensus/types"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -75,15 +74,16 @@ type ConsensusState struct {
privValidator types.PrivValidator // for signing votes privValidator types.PrivValidator // for signing votes
// services for creating and executing blocks // services for creating and executing blocks
proxyAppConn proxy.AppConnConsensus // TODO: encapsulate all of this in one "BlockManager"
blockStore types.BlockStore blockExec *sm.BlockExecutor
mempool types.Mempool blockStore types.BlockStore
evpool types.EvidencePool mempool types.Mempool
evpool types.EvidencePool
// internal state // internal state
mtx sync.Mutex mtx sync.Mutex
cstypes.RoundState cstypes.RoundState
state *sm.State // State until height-1. state sm.State // State until height-1.
// state changes may be triggered by msgs from peers, // state changes may be triggered by msgs from peers,
// msgs from ourself, or by timeouts // msgs from ourself, or by timeouts
@ -114,10 +114,10 @@ type ConsensusState struct {
} }
// NewConsensusState returns a new ConsensusState. // NewConsensusState returns a new ConsensusState.
func NewConsensusState(config *cfg.ConsensusConfig, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState {
cs := &ConsensusState{ cs := &ConsensusState{
config: config, config: config,
proxyAppConn: proxyAppConn, blockExec: blockExec,
blockStore: blockStore, blockStore: blockStore,
mempool: mempool, mempool: mempool,
peerMsgQueue: make(chan msgInfo, msgQueueSize), peerMsgQueue: make(chan msgInfo, msgQueueSize),
@ -153,6 +153,7 @@ func (cs *ConsensusState) SetLogger(l log.Logger) {
// SetEventBus sets event bus. // SetEventBus sets event bus.
func (cs *ConsensusState) SetEventBus(b *types.EventBus) { func (cs *ConsensusState) SetEventBus(b *types.EventBus) {
cs.eventBus = b cs.eventBus = b
cs.blockExec.SetEventBus(b)
} }
// String returns a string. // String returns a string.
@ -162,7 +163,7 @@ func (cs *ConsensusState) String() string {
} }
// GetState returns a copy of the chain state. // GetState returns a copy of the chain state.
func (cs *ConsensusState) GetState() *sm.State { func (cs *ConsensusState) GetState() sm.State {
cs.mtx.Lock() cs.mtx.Lock()
defer cs.mtx.Unlock() defer cs.mtx.Unlock()
return cs.state.Copy() return cs.state.Copy()
@ -399,7 +400,7 @@ func (cs *ConsensusState) sendInternalMessage(mi msgInfo) {
// Reconstruct LastCommit from SeenCommit, which we saved along with the block, // Reconstruct LastCommit from SeenCommit, which we saved along with the block,
// (which happens even before saving the state) // (which happens even before saving the state)
func (cs *ConsensusState) reconstructLastCommit(state *sm.State) { func (cs *ConsensusState) reconstructLastCommit(state sm.State) {
if state.LastBlockHeight == 0 { if state.LastBlockHeight == 0 {
return return
} }
@ -422,12 +423,12 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) {
// Updates ConsensusState and increments height to match that of state. // Updates ConsensusState and increments height to match that of state.
// The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight. // The round becomes 0 and cs.Step becomes cstypes.RoundStepNewHeight.
func (cs *ConsensusState) updateToState(state *sm.State) { func (cs *ConsensusState) updateToState(state sm.State) {
if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight { if cs.CommitRound > -1 && 0 < cs.Height && cs.Height != state.LastBlockHeight {
cmn.PanicSanity(cmn.Fmt("updateToState() expected state height of %v but found %v", cmn.PanicSanity(cmn.Fmt("updateToState() expected state height of %v but found %v",
cs.Height, state.LastBlockHeight)) cs.Height, state.LastBlockHeight))
} }
if cs.state != nil && cs.state.LastBlockHeight+1 != cs.Height { if !cs.state.IsEmpty() && cs.state.LastBlockHeight+1 != cs.Height {
// This might happen when someone else is mutating cs.state. // This might happen when someone else is mutating cs.state.
// Someone forgot to pass in state.Copy() somewhere?! // Someone forgot to pass in state.Copy() somewhere?!
cmn.PanicSanity(cmn.Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v", cmn.PanicSanity(cmn.Fmt("Inconsistent cs.state.LastBlockHeight+1 %v vs cs.Height %v",
@ -437,7 +438,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
// If state isn't further out than cs.state, just ignore. // If state isn't further out than cs.state, just ignore.
// This happens when SwitchToConsensus() is called in the reactor. // This happens when SwitchToConsensus() is called in the reactor.
// We don't want to reset e.g. the Votes. // We don't want to reset e.g. the Votes.
if cs.state != nil && (state.LastBlockHeight <= cs.state.LastBlockHeight) { if !cs.state.IsEmpty() && (state.LastBlockHeight <= cs.state.LastBlockHeight) {
cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1) cs.Logger.Info("Ignoring updateToState()", "newHeight", state.LastBlockHeight+1, "oldHeight", cs.state.LastBlockHeight+1)
return return
} }
@ -922,7 +923,7 @@ func (cs *ConsensusState) defaultDoPrevote(height int64, round int) {
} }
// Validate proposal block // Validate proposal block
err := cs.state.ValidateBlock(cs.ProposalBlock) err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock)
if err != nil { if err != nil {
// ProposalBlock is invalid, prevote nil. // ProposalBlock is invalid, prevote nil.
logger.Error("enterPrevote: ProposalBlock is invalid", "err", err) logger.Error("enterPrevote: ProposalBlock is invalid", "err", err)
@ -1030,7 +1031,7 @@ func (cs *ConsensusState) enterPrecommit(height int64, round int) {
if cs.ProposalBlock.HashesTo(blockID.Hash) { if cs.ProposalBlock.HashesTo(blockID.Hash) {
cs.Logger.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", blockID.Hash) cs.Logger.Info("enterPrecommit: +2/3 prevoted proposal block. Locking", "hash", blockID.Hash)
// Validate the block. // Validate the block.
if err := cs.state.ValidateBlock(cs.ProposalBlock); err != nil { if err := cs.blockExec.ValidateBlock(cs.state, cs.ProposalBlock); err != nil {
cmn.PanicConsensus(cmn.Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err)) cmn.PanicConsensus(cmn.Fmt("enterPrecommit: +2/3 prevoted for an invalid block: %v", err))
} }
cs.LockedRound = round cs.LockedRound = round
@ -1165,7 +1166,7 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
if !block.HashesTo(blockID.Hash) { if !block.HashesTo(blockID.Hash) {
cmn.PanicSanity(cmn.Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash")) cmn.PanicSanity(cmn.Fmt("Cannot finalizeCommit, ProposalBlock does not hash to commit hash"))
} }
if err := cs.state.ValidateBlock(block); err != nil { if err := cs.blockExec.ValidateBlock(cs.state, block); err != nil {
cmn.PanicConsensus(cmn.Fmt("+2/3 committed an invalid block: %v", err)) cmn.PanicConsensus(cmn.Fmt("+2/3 committed an invalid block: %v", err))
} }
@ -1203,14 +1204,11 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
// Create a copy of the state for staging // Create a copy of the state for staging
// and an event cache for txs // and an event cache for txs
stateCopy := cs.state.Copy() stateCopy := cs.state.Copy()
txEventBuffer := types.NewTxEventBuffer(cs.eventBus, int(block.NumTxs))
// Execute and commit the block, update and save the state, and update the mempool. // Execute and commit the block, update and save the state, and update the mempool.
// All calls to the proxyAppConn come here.
// NOTE: the block.AppHash wont reflect these txs until the next block // NOTE: the block.AppHash wont reflect these txs until the next block
err := stateCopy.ApplyBlock(txEventBuffer, cs.proxyAppConn, var err error
block, blockParts.Header(), stateCopy, err = cs.blockExec.ApplyBlock(stateCopy, types.BlockID{block.Hash(), blockParts.Header()}, block)
cs.mempool, cs.evpool)
if err != nil { if err != nil {
cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err) cs.Logger.Error("Error on ApplyBlock. Did the application crash? Please restart tendermint", "err", err)
err := cmn.Kill() err := cmn.Kill()
@ -1222,22 +1220,6 @@ func (cs *ConsensusState) finalizeCommit(height int64) {
fail.Fail() // XXX fail.Fail() // XXX
// Fire event for new block.
// NOTE: If we fail before firing, these events will never fire
//
// TODO: Either
// * Fire before persisting state, in ApplyBlock
// * Fire on start up if we haven't written any new WAL msgs
// Both options mean we may fire more than once. Is that fine ?
cs.eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
cs.eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
err = txEventBuffer.Flush()
if err != nil {
cs.Logger.Error("Failed to flush event buffer", "err", err)
}
fail.Fail() // XXX
// NewHeightStep! // NewHeightStep!
cs.updateToState(stateCopy) cs.updateToState(stateCopy)

View File

@ -47,13 +47,12 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
} }
stateDB := db.NewMemDB() stateDB := db.NewMemDB()
blockStoreDB := db.NewMemDB() blockStoreDB := db.NewMemDB()
state, err := sm.MakeGenesisState(stateDB, genDoc) state, err := sm.MakeGenesisState(genDoc)
state.SetLogger(logger.With("module", "state"))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to make genesis state") return nil, errors.Wrap(err, "failed to make genesis state")
} }
blockStore := bc.NewBlockStore(blockStoreDB) blockStore := bc.NewBlockStore(blockStoreDB)
handshaker := NewHandshaker(state, blockStore) handshaker := NewHandshaker(stateDB, state, blockStore)
proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker) proxyApp := proxy.NewAppConns(proxy.NewLocalClientCreator(app), handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
if err := proxyApp.Start(); err != nil { if err := proxyApp.Start(); err != nil {
@ -68,7 +67,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
defer eventBus.Stop() defer eventBus.Stop()
mempool := types.MockMempool{} mempool := types.MockMempool{}
evpool := types.MockEvidencePool{} evpool := types.MockEvidencePool{}
consensusState := NewConsensusState(config.Consensus, state.Copy(), proxyApp.Consensus(), blockStore, mempool, evpool) blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger) consensusState.SetLogger(logger)
consensusState.SetEventBus(eventBus) consensusState.SetEventBus(eventBus)
if privValidator != nil { if privValidator != nil {

View File

@ -1,30 +1,40 @@
package evidence package evidence
import ( import (
"fmt"
"sync"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// EvidencePool maintains a pool of valid evidence // EvidencePool maintains a pool of valid evidence
// in an EvidenceStore. // in an EvidenceStore.
type EvidencePool struct { type EvidencePool struct {
params types.EvidenceParams
logger log.Logger logger log.Logger
state types.State // TODO: update this on commit!
evidenceStore *EvidenceStore evidenceStore *EvidenceStore
// needed to load validators to verify evidence
stateDB dbm.DB
// latest state
mtx sync.Mutex
state sm.State
// never close // never close
evidenceChan chan types.Evidence evidenceChan chan types.Evidence
} }
func NewEvidencePool(params types.EvidenceParams, evidenceStore *EvidenceStore, state types.State) *EvidencePool { func NewEvidencePool(stateDB dbm.DB, evidenceStore *EvidenceStore) *EvidencePool {
evpool := &EvidencePool{ evpool := &EvidencePool{
params: params, stateDB: stateDB,
state: sm.LoadState(stateDB),
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
evidenceStore: evidenceStore, evidenceStore: evidenceStore,
state: state,
evidenceChan: make(chan types.Evidence), evidenceChan: make(chan types.Evidence),
} }
return evpool return evpool
@ -50,19 +60,45 @@ func (evpool *EvidencePool) PendingEvidence() []types.Evidence {
return evpool.evidenceStore.PendingEvidence() return evpool.evidenceStore.PendingEvidence()
} }
// State returns the current state of the evpool.
func (evpool *EvidencePool) State() sm.State {
evpool.mtx.Lock()
defer evpool.mtx.Unlock()
return evpool.state
}
// Update loads the latest
func (evpool *EvidencePool) Update(block *types.Block) {
evpool.mtx.Lock()
defer evpool.mtx.Unlock()
state := sm.LoadState(evpool.stateDB)
if state.LastBlockHeight != block.Height {
panic(fmt.Sprintf("EvidencePool.Update: loaded state with height %d when block.Height=%d", state.LastBlockHeight, block.Height))
}
evpool.state = state
// NOTE: shouldn't need the mutex
evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence)
}
// AddEvidence checks the evidence is valid and adds it to the pool. // AddEvidence checks the evidence is valid and adds it to the pool.
// Blocks on the EvidenceChan. // Blocks on the EvidenceChan.
func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) { func (evpool *EvidencePool) AddEvidence(evidence types.Evidence) (err error) {
// TODO: check if we already have evidence for this // TODO: check if we already have evidence for this
// validator at this height so we dont get spammed // validator at this height so we dont get spammed
priority, err := evpool.state.VerifyEvidence(evidence) if err := sm.VerifyEvidence(evpool.stateDB, evpool.State(), evidence); err != nil {
if err != nil {
// TODO: if err is just that we cant find it cuz we pruned, ignore.
// TODO: if its actually bad evidence, punish peer
return err return err
} }
// fetch the validator and return its voting power as its priority
// TODO: something better ?
valset, _ := sm.LoadValidators(evpool.stateDB, evidence.Height())
_, val := valset.GetByAddress(evidence.Address())
priority := val.VotingPower
added := evpool.evidenceStore.AddNewEvidence(evidence, priority) added := evpool.evidenceStore.AddNewEvidence(evidence, priority)
if !added { if !added {
// evidence already known, just ignore // evidence already known, just ignore

View File

@ -3,29 +3,57 @@ package evidence
import ( import (
"sync" "sync"
"testing" "testing"
"time"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
) )
type mockState struct{} var mockState = sm.State{}
func (m mockState) VerifyEvidence(ev types.Evidence) (int64, error) { func initializeValidatorState(valAddr []byte, height int64) dbm.DB {
err := ev.Verify("") stateDB := dbm.NewMemDB()
return 10, err
// create validator set and state
valSet := &types.ValidatorSet{
Validators: []*types.Validator{
{Address: valAddr},
},
}
state := sm.State{
LastBlockHeight: 0,
LastBlockTime: time.Now(),
Validators: valSet,
LastHeightValidatorsChanged: 1,
ConsensusParams: types.ConsensusParams{
EvidenceParams: types.EvidenceParams{
MaxAge: 1000000,
},
},
}
// save all states up to height
for i := int64(0); i < height; i++ {
state.LastBlockHeight = i
sm.SaveState(stateDB, state)
}
return stateDB
} }
func TestEvidencePool(t *testing.T) { func TestEvidencePool(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
params := types.EvidenceParams{} valAddr := []byte("val1")
height := int64(5)
stateDB := initializeValidatorState(valAddr, height)
store := NewEvidenceStore(dbm.NewMemDB()) store := NewEvidenceStore(dbm.NewMemDB())
state := mockState{} pool := NewEvidencePool(stateDB, store)
pool := NewEvidencePool(params, store, state)
goodEvidence := newMockGoodEvidence(5, 1, []byte("val1")) goodEvidence := newMockGoodEvidence(height, 0, valAddr)
badEvidence := MockBadEvidence{goodEvidence} badEvidence := MockBadEvidence{goodEvidence}
err := pool.AddEvidence(badEvidence) err := pool.AddEvidence(badEvidence)

View File

@ -32,15 +32,14 @@ func evidenceLogger() log.Logger {
} }
// connect N evidence reactors through N switches // connect N evidence reactors through N switches
func makeAndConnectEvidenceReactors(config *cfg.Config, N int) []*EvidenceReactor { func makeAndConnectEvidenceReactors(config *cfg.Config, stateDBs []dbm.DB) []*EvidenceReactor {
N := len(stateDBs)
reactors := make([]*EvidenceReactor, N) reactors := make([]*EvidenceReactor, N)
logger := evidenceLogger() logger := evidenceLogger()
for i := 0; i < N; i++ { for i := 0; i < N; i++ {
params := types.EvidenceParams{}
store := NewEvidenceStore(dbm.NewMemDB()) store := NewEvidenceStore(dbm.NewMemDB())
state := mockState{} pool := NewEvidencePool(stateDBs[i], store)
pool := NewEvidencePool(params, store, state)
reactors[i] = NewEvidenceReactor(pool) reactors[i] = NewEvidenceReactor(pool)
reactors[i].SetLogger(logger.With("validator", i)) reactors[i].SetLogger(logger.With("validator", i))
} }
@ -99,10 +98,10 @@ func _waitForEvidence(t *testing.T, wg *sync.WaitGroup, evs types.EvidenceList,
wg.Done() wg.Done()
} }
func sendEvidence(t *testing.T, evpool *EvidencePool, n int) types.EvidenceList { func sendEvidence(t *testing.T, evpool *EvidencePool, valAddr []byte, n int) types.EvidenceList {
evList := make([]types.Evidence, n) evList := make([]types.Evidence, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
ev := newMockGoodEvidence(int64(i), 2, []byte("val")) ev := newMockGoodEvidence(int64(i+1), 0, valAddr)
err := evpool.AddEvidence(ev) err := evpool.AddEvidence(ev)
assert.Nil(t, err) assert.Nil(t, err)
evList[i] = ev evList[i] = ev
@ -111,17 +110,28 @@ func sendEvidence(t *testing.T, evpool *EvidencePool, n int) types.EvidenceList
} }
var ( var (
NUM_EVIDENCE = 1000 NUM_EVIDENCE = 1
TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow TIMEOUT = 120 * time.Second // ridiculously high because CircleCI is slow
) )
func TestReactorBroadcastEvidence(t *testing.T) { func TestReactorBroadcastEvidence(t *testing.T) {
config := cfg.TestConfig() config := cfg.TestConfig()
N := 7 N := 7
reactors := makeAndConnectEvidenceReactors(config, N)
// send a bunch of evidence to the first reactor's evpool // create statedb for everyone
stateDBs := make([]dbm.DB, N)
valAddr := []byte("myval")
// we need validators saved for heights at least as high as we have evidence for
height := int64(NUM_EVIDENCE) + 10
for i := 0; i < N; i++ {
stateDBs[i] = initializeValidatorState(valAddr, height)
}
// make reactors from statedb
reactors := makeAndConnectEvidenceReactors(config, stateDBs)
// send a bunch of valid evidence to the first reactor's evpool
// and wait for them all to be received in the others // and wait for them all to be received in the others
evList := sendEvidence(t, reactors[0].evpool, NUM_EVIDENCE) evList := sendEvidence(t, reactors[0].evpool, valAddr, NUM_EVIDENCE)
waitForEvidence(t, evList, reactors) waitForEvidence(t, evList, reactors)
} }

View File

@ -113,12 +113,14 @@ func TestStorePriority(t *testing.T) {
//------------------------------------------- //-------------------------------------------
const ( const (
evidenceTypeMock = byte(0x01) evidenceTypeMockGood = byte(0x01)
evidenceTypeMockBad = byte(0x02)
) )
var _ = wire.RegisterInterface( var _ = wire.RegisterInterface(
struct{ types.Evidence }{}, struct{ types.Evidence }{},
wire.ConcreteType{MockGoodEvidence{}, evidenceTypeMock}, wire.ConcreteType{MockGoodEvidence{}, evidenceTypeMockGood},
wire.ConcreteType{MockBadEvidence{}, evidenceTypeMockBad},
) )
type MockGoodEvidence struct { type MockGoodEvidence struct {

View File

@ -102,7 +102,8 @@ type Node struct {
trustMetricStore *trust.TrustMetricStore // trust metrics for all peers trustMetricStore *trust.TrustMetricStore // trust metrics for all peers
// services // services
eventBus *types.EventBus // pub/sub for services eventBus *types.EventBus // pub/sub for services
stateDB dbm.DB
blockStore *bc.BlockStore // store the blockchain to disk blockStore *bc.BlockStore // store the blockchain to disk
bcReactor *bc.BlockchainReactor // for fast-syncing bcReactor *bc.BlockchainReactor // for fast-syncing
mempoolReactor *mempl.MempoolReactor // for gossipping transactions mempoolReactor *mempl.MempoolReactor // for gossipping transactions
@ -137,6 +138,7 @@ func NewNode(config *cfg.Config,
} }
// Get genesis doc // Get genesis doc
// TODO: move to state package?
genDoc, err := loadGenesisDoc(stateDB) genDoc, err := loadGenesisDoc(stateDB)
if err != nil { if err != nil {
genDoc, err = genesisDocProvider() genDoc, err = genesisDocProvider()
@ -148,21 +150,16 @@ func NewNode(config *cfg.Config,
saveGenesisDoc(stateDB, genDoc) saveGenesisDoc(stateDB, genDoc)
} }
stateLogger := logger.With("module", "state") state, err := sm.LoadStateFromDBOrGenesisDoc(stateDB, genDoc)
state := sm.LoadState(stateDB) if err != nil {
if state == nil { return nil, err
state, err = sm.MakeGenesisState(stateDB, genDoc)
if err != nil {
return nil, err
}
state.Save()
} }
state.SetLogger(stateLogger)
// Create the proxyApp, which manages connections (consensus, mempool, query) // Create the proxyApp, which manages connections (consensus, mempool, query)
// and sync tendermint and the app by replaying any necessary blocks // and sync tendermint and the app by performing a handshake
// and replaying any necessary blocks
consensusLogger := logger.With("module", "consensus") consensusLogger := logger.With("module", "consensus")
handshaker := consensus.NewHandshaker(state, blockStore) handshaker := consensus.NewHandshaker(stateDB, state, blockStore)
handshaker.SetLogger(consensusLogger) handshaker.SetLogger(consensusLogger)
proxyApp := proxy.NewAppConns(clientCreator, handshaker) proxyApp := proxy.NewAppConns(clientCreator, handshaker)
proxyApp.SetLogger(logger.With("module", "proxy")) proxyApp.SetLogger(logger.With("module", "proxy"))
@ -172,7 +169,6 @@ func NewNode(config *cfg.Config,
// reload the state (it may have been updated by the handshake) // reload the state (it may have been updated by the handshake)
state = sm.LoadState(stateDB) state = sm.LoadState(stateDB)
state.SetLogger(stateLogger)
// Generate node PrivKey // Generate node PrivKey
privKey := crypto.GenPrivKeyEd25519() privKey := crypto.GenPrivKeyEd25519()
@ -194,10 +190,6 @@ func NewNode(config *cfg.Config,
consensusLogger.Info("This node is not a validator") consensusLogger.Info("This node is not a validator")
} }
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Make MempoolReactor // Make MempoolReactor
mempoolLogger := logger.With("module", "mempool") mempoolLogger := logger.With("module", "mempool")
mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight) mempool := mempl.NewMempool(config.Mempool, proxyApp.Mempool(), state.LastBlockHeight)
@ -216,14 +208,22 @@ func NewNode(config *cfg.Config,
} }
evidenceLogger := logger.With("module", "evidence") evidenceLogger := logger.With("module", "evidence")
evidenceStore := evidence.NewEvidenceStore(evidenceDB) evidenceStore := evidence.NewEvidenceStore(evidenceDB)
evidencePool := evidence.NewEvidencePool(state.ConsensusParams.EvidenceParams, evidenceStore, state.Copy()) evidencePool := evidence.NewEvidencePool(stateDB, evidenceStore)
evidencePool.SetLogger(evidenceLogger) evidencePool.SetLogger(evidenceLogger)
evidenceReactor := evidence.NewEvidenceReactor(evidencePool) evidenceReactor := evidence.NewEvidenceReactor(evidencePool)
evidenceReactor.SetLogger(evidenceLogger) evidenceReactor.SetLogger(evidenceLogger)
blockExecLogger := logger.With("module", "state")
// make block executor for consensus and blockchain reactors to execute blocks
blockExec := sm.NewBlockExecutor(stateDB, blockExecLogger, proxyApp.Consensus(), mempool, evidencePool)
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Make ConsensusReactor // Make ConsensusReactor
consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(), consensusState := consensus.NewConsensusState(config.Consensus, state.Copy(),
proxyApp.Consensus(), blockStore, mempool, evidencePool) blockExec, blockStore, mempool, evidencePool)
consensusState.SetLogger(consensusLogger) consensusState.SetLogger(consensusLogger)
if privValidator != nil { if privValidator != nil {
consensusState.SetPrivValidator(privValidator) consensusState.SetPrivValidator(privValidator)
@ -291,7 +291,7 @@ func NewNode(config *cfg.Config,
eventBus.SetLogger(logger.With("module", "events")) eventBus.SetLogger(logger.With("module", "events"))
// services which will be publishing and/or subscribing for messages (events) // services which will be publishing and/or subscribing for messages (events)
bcReactor.SetEventBus(eventBus) // consensusReactor will set it on consensusState and blockExecutor
consensusReactor.SetEventBus(eventBus) consensusReactor.SetEventBus(eventBus)
// Transaction indexing // Transaction indexing
@ -333,6 +333,7 @@ func NewNode(config *cfg.Config,
addrBook: addrBook, addrBook: addrBook,
trustMetricStore: trustMetricStore, trustMetricStore: trustMetricStore,
stateDB: stateDB,
blockStore: blockStore, blockStore: blockStore,
bcReactor: bcReactor, bcReactor: bcReactor,
mempoolReactor: mempoolReactor, mempoolReactor: mempoolReactor,
@ -429,6 +430,7 @@ func (n *Node) AddListener(l p2p.Listener) {
// ConfigureRPC sets all variables in rpccore so they will serve // ConfigureRPC sets all variables in rpccore so they will serve
// rpc calls from this node // rpc calls from this node
func (n *Node) ConfigureRPC() { func (n *Node) ConfigureRPC() {
rpccore.SetStateDB(n.stateDB)
rpccore.SetBlockStore(n.blockStore) rpccore.SetBlockStore(n.blockStore)
rpccore.SetConsensusState(n.consensusState) rpccore.SetConsensusState(n.consensusState)
rpccore.SetMempool(n.mempoolReactor.Mempool) rpccore.SetMempool(n.mempoolReactor.Mempool)

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
) )
@ -336,8 +337,7 @@ func BlockResults(heightPtr *int64) (*ctypes.ResultBlockResults, error) {
} }
// load the results // load the results
state := consensusState.GetState() results, err := sm.LoadABCIResponses(stateDB, height)
results, err := state.LoadABCIResponses(height)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -4,6 +4,7 @@ import (
cm "github.com/tendermint/tendermint/consensus" cm "github.com/tendermint/tendermint/consensus"
cstypes "github.com/tendermint/tendermint/consensus/types" cstypes "github.com/tendermint/tendermint/consensus/types"
ctypes "github.com/tendermint/tendermint/rpc/core/types" ctypes "github.com/tendermint/tendermint/rpc/core/types"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
@ -49,8 +50,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) {
return nil, err return nil, err
} }
state := consensusState.GetState() validators, err := sm.LoadValidators(stateDB, height)
validators, err := state.LoadValidators(height)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -11,6 +11,7 @@ import (
sm "github.com/tendermint/tendermint/state" sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/txindex" "github.com/tendermint/tendermint/state/txindex"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
) )
@ -20,7 +21,7 @@ var subscribeTimeout = 5 * time.Second
// These interfaces are used by RPC and must be thread safe // These interfaces are used by RPC and must be thread safe
type Consensus interface { type Consensus interface {
GetState() *sm.State GetState() sm.State
GetValidators() (int64, []*types.Validator) GetValidators() (int64, []*types.Validator)
GetRoundState() *cstypes.RoundState GetRoundState() *cstypes.RoundState
} }
@ -43,6 +44,7 @@ var (
proxyAppQuery proxy.AppConnQuery proxyAppQuery proxy.AppConnQuery
// interfaces defined in types and above // interfaces defined in types and above
stateDB dbm.DB
blockStore types.BlockStore blockStore types.BlockStore
mempool types.Mempool mempool types.Mempool
evidencePool types.EvidencePool evidencePool types.EvidencePool
@ -60,6 +62,10 @@ var (
logger log.Logger logger log.Logger
) )
func SetStateDB(db dbm.DB) {
stateDB = db
}
func SetBlockStore(bs types.BlockStore) { func SetBlockStore(bs types.BlockStore) {
blockStore = bs blockStore = bs
} }

View File

@ -1,7 +1,6 @@
package state package state
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
@ -10,36 +9,154 @@ import (
crypto "github.com/tendermint/go-crypto" crypto "github.com/tendermint/go-crypto"
"github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/proxy"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log" "github.com/tendermint/tmlibs/log"
) )
//-------------------------------------------------- //-----------------------------------------------------------------------------
// Execute the block // BlockExecutor handles block execution and state updates.
// It exposes ApplyBlock(), which validates & executes the block, updates state w/ ABCI responses,
// then commits and updates the mempool atomically, then saves state.
// ValExecBlock executes the block, but does NOT mutate State. // BlockExecutor provides the context and accessories for properly executing a block.
// + validates the block type BlockExecutor struct {
// + executes block.Txs on the proxyAppConn // save state, validators, consensus params, abci responses here
func (s *State) ValExecBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) { db dbm.DB
// Validate the block.
if err := s.validateBlock(block); err != nil {
return nil, ErrInvalidBlock(err)
}
// Execute the block txs // execute the app against this
abciResponses, err := execBlockOnProxyApp(txEventPublisher, proxyAppConn, block, s.logger, s.LastValidators) proxyApp proxy.AppConnConsensus
if err != nil {
// There was some error in proxyApp
// TODO Report error and wait for proxyApp to be available.
return nil, ErrProxyAppConn(err)
}
return abciResponses, nil // events
eventBus types.BlockEventPublisher
// update these with block results after commit
mempool types.Mempool
evpool types.EvidencePool
logger log.Logger
} }
// NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one.
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor {
return &BlockExecutor{
db: db,
proxyApp: proxyApp,
eventBus: types.NopEventBus{},
mempool: mempool,
evpool: evpool,
logger: logger,
}
}
// SetEventBus - sets the event bus for publishing block related events.
// If not called, it defaults to types.NopEventBus.
func (blockExec *BlockExecutor) SetEventBus(eventBus types.BlockEventPublisher) {
blockExec.eventBus = eventBus
}
// ValidateBlock validates the given block against the given state.
// If the block is invalid, it returns an error.
// Validation does not mutate state, but does require historical information from the stateDB,
// ie. to verify evidence from a validator at an old height.
func (blockExec *BlockExecutor) ValidateBlock(s State, block *types.Block) error {
return validateBlock(blockExec.db, s, block)
}
// ApplyBlock validates the block against the state, executes it against the app,
// fires the relevant events, commits the app, and saves the new state and responses.
// It's the only function that needs to be called
// from outside this package to process and commit an entire block.
// It takes a blockID to avoid recomputing the parts hash.
func (blockExec *BlockExecutor) ApplyBlock(s State, blockID types.BlockID, block *types.Block) (State, error) {
if err := blockExec.ValidateBlock(s, block); err != nil {
return s, ErrInvalidBlock(err)
}
abciResponses, err := execBlockOnProxyApp(blockExec.logger, blockExec.proxyApp, block)
if err != nil {
return s, ErrProxyAppConn(err)
}
fail.Fail() // XXX
// save the results before we commit
saveABCIResponses(blockExec.db, block.Height, abciResponses)
fail.Fail() // XXX
// update the state with the block and responses
s, err = updateState(s, blockID, block.Header, abciResponses)
if err != nil {
return s, fmt.Errorf("Commit failed for application: %v", err)
}
// lock mempool, commit state, update mempoool
appHash, err := blockExec.Commit(block)
if err != nil {
return s, fmt.Errorf("Commit failed for application: %v", err)
}
fail.Fail() // XXX
// update the app hash and save the state
s.AppHash = appHash
SaveState(blockExec.db, s)
fail.Fail() // XXX
// Update evpool now that state is saved
// TODO: handle the crash/recover scenario
// ie. (may need to call Update for last block)
blockExec.evpool.Update(block)
// events are fired after everything else
// NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)
return s, nil
}
// Commit locks the mempool, runs the ABCI Commit message, and updates the mempool.
// It returns the result of calling abci.Commit (the AppHash), and an error.
// The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed
// against committed state before new txs are run in the mempool, lest they be invalid.
func (blockExec *BlockExecutor) Commit(block *types.Block) ([]byte, error) {
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()
// Commit block, get hash back
res, err := blockExec.proxyApp.CommitSync()
if err != nil {
blockExec.logger.Error("Client error during proxyAppConn.CommitSync", "err", err)
return nil, err
}
if res.IsErr() {
blockExec.logger.Error("Error in proxyAppConn.CommitSync", "err", res)
return nil, res
}
if res.Log != "" {
blockExec.logger.Debug("Commit.Log: " + res.Log)
}
blockExec.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "appHash", res.Data)
// Update mempool.
if err := blockExec.mempool.Update(block.Height, block.Txs); err != nil {
return nil, err
}
return res.Data, nil
}
//---------------------------------------------------------
// Helper functions for executing blocks and updating state
// Executes block's transactions on proxyAppConn. // Executes block's transactions on proxyAppConn.
// Returns a list of transaction results and updates to the validator set // Returns a list of transaction results and updates to the validator set
// TODO: Generate a bitmap or otherwise store tx validity in state. func execBlockOnProxyApp(logger log.Logger, proxyAppConn proxy.AppConnConsensus, block *types.Block) (*ABCIResponses, error) {
func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, block *types.Block, logger log.Logger, lastValidators *types.ValidatorSet) (*ABCIResponses, error) {
var validTxs, invalidTxs = 0, 0 var validTxs, invalidTxs = 0, 0
txIndex := 0 txIndex := 0
@ -59,17 +176,6 @@ func execBlockOnProxyApp(txEventPublisher types.TxEventPublisher, proxyAppConn p
logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log) logger.Debug("Invalid tx", "code", txRes.Code, "log", txRes.Log)
invalidTxs++ invalidTxs++
} }
// NOTE: if we count we can access the tx from the block instead of
// pulling it from the req
tx := types.Tx(req.GetDeliverTx().Tx)
txEventPublisher.PublishEventTx(types.EventDataTx{types.TxResult{
Height: block.Height,
Index: uint32(txIndex),
Tx: tx,
Result: *txRes,
}})
abciResponses.DeliverTx[txIndex] = txRes abciResponses.DeliverTx[txIndex] = txRes
txIndex++ txIndex++
} }
@ -209,194 +315,91 @@ func changeInVotingPowerMoreOrEqualToOneThird(currentSet *types.ValidatorSet, up
return false, nil return false, nil
} }
// return a bit array of validators that signed the last commit // updateState returns a new State updated according to the header and responses.
// NOTE: assumes commits have already been authenticated func updateState(s State, blockID types.BlockID, header *types.Header,
/* function is currently unused abciResponses *ABCIResponses) (State, error) {
func commitBitArrayFromBlock(block *types.Block) *cmn.BitArray {
signed := cmn.NewBitArray(len(block.LastCommit.Precommits))
for i, precommit := range block.LastCommit.Precommits {
if precommit != nil {
signed.SetIndex(i, true) // val_.LastCommitHeight = block.Height - 1
}
}
return signed
}
*/
//----------------------------------------------------- // copy the valset so we can apply changes from EndBlock
// Validate block // and update s.LastValidators and s.Validators
prevValSet := s.Validators.Copy()
nextValSet := prevValSet.Copy()
// ValidateBlock validates the block against the state. // update the validator set with the latest abciResponses
func (s *State) ValidateBlock(block *types.Block) error { lastHeightValsChanged := s.LastHeightValidatorsChanged
return s.validateBlock(block) if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
} err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates)
// MakeBlock builds a block with the given txs and commit from the current state.
func (s *State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) {
// build base block
block := types.MakeBlock(height, txs, commit)
// fill header with state data
block.ChainID = s.ChainID
block.TotalTxs = s.LastBlockTotalTx + block.NumTxs
block.LastBlockID = s.LastBlockID
block.ValidatorsHash = s.Validators.Hash()
block.AppHash = s.AppHash
block.ConsensusHash = s.ConsensusParams.Hash()
block.LastResultsHash = s.LastResultsHash
return block, block.MakePartSet(s.ConsensusParams.BlockGossip.BlockPartSizeBytes)
}
func (s *State) validateBlock(b *types.Block) error {
// validate internal consistency
if err := b.ValidateBasic(); err != nil {
return err
}
// validate basic info
if b.ChainID != s.ChainID {
return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", s.ChainID, b.ChainID)
}
if b.Height != s.LastBlockHeight+1 {
return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", s.LastBlockHeight+1, b.Height)
}
/* TODO: Determine bounds for Time
See blockchain/reactor "stopSyncingDurationMinutes"
if !b.Time.After(lastBlockTime) {
return errors.New("Invalid Block.Header.Time")
}
*/
// validate prev block info
if !b.LastBlockID.Equals(s.LastBlockID) {
return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", s.LastBlockID, b.LastBlockID)
}
newTxs := int64(len(b.Data.Txs))
if b.TotalTxs != s.LastBlockTotalTx+newTxs {
return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", s.LastBlockTotalTx+newTxs, b.TotalTxs)
}
// validate app info
if !bytes.Equal(b.AppHash, s.AppHash) {
return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", s.AppHash, b.AppHash)
}
if !bytes.Equal(b.ConsensusHash, s.ConsensusParams.Hash()) {
return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", s.ConsensusParams.Hash(), b.ConsensusHash)
}
if !bytes.Equal(b.LastResultsHash, s.LastResultsHash) {
return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", s.LastResultsHash, b.LastResultsHash)
}
if !bytes.Equal(b.ValidatorsHash, s.Validators.Hash()) {
return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", s.Validators.Hash(), b.ValidatorsHash)
}
// Validate block LastCommit.
if b.Height == 1 {
if len(b.LastCommit.Precommits) != 0 {
return errors.New("Block at height 1 (first block) should have no LastCommit precommits")
}
} else {
if len(b.LastCommit.Precommits) != s.LastValidators.Size() {
return fmt.Errorf("Invalid block commit size. Expected %v, got %v",
s.LastValidators.Size(), len(b.LastCommit.Precommits))
}
err := s.LastValidators.VerifyCommit(
s.ChainID, s.LastBlockID, b.Height-1, b.LastCommit)
if err != nil { if err != nil {
return err return s, fmt.Errorf("Error changing validator set: %v", err)
} }
// change results from this height but only applies to the next height
lastHeightValsChanged = header.Height + 1
} }
for _, ev := range b.Evidence.Evidence { // Update validator accums and set state variables
if _, err := s.VerifyEvidence(ev); err != nil { nextValSet.IncrementAccum(1)
return types.NewEvidenceInvalidErr(ev, err)
// update the params with the latest abciResponses
nextParams := s.ConsensusParams
lastHeightParamsChanged := s.LastHeightConsensusParamsChanged
if abciResponses.EndBlock.ConsensusParamUpdates != nil {
// NOTE: must not mutate s.ConsensusParams
nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
err := nextParams.Validate()
if err != nil {
return s, fmt.Errorf("Error updating consensus params: %v", err)
} }
// change results from this height but only applies to the next height
lastHeightParamsChanged = header.Height + 1
} }
return nil // NOTE: the AppHash has not been populated.
// It will be filled on state.Save.
return State{
ChainID: s.ChainID,
LastBlockHeight: header.Height,
LastBlockTotalTx: s.LastBlockTotalTx + header.NumTxs,
LastBlockID: blockID,
LastBlockTime: header.Time,
Validators: nextValSet,
LastValidators: s.Validators.Copy(),
LastHeightValidatorsChanged: lastHeightValsChanged,
ConsensusParams: nextParams,
LastHeightConsensusParamsChanged: lastHeightParamsChanged,
LastResultsHash: abciResponses.ResultsHash(),
AppHash: nil,
}, nil
} }
//----------------------------------------------------------------------------- // Fire NewBlock, NewBlockHeader.
// ApplyBlock validates & executes the block, updates state w/ ABCI responses, // Fire TxEvent for every tx.
// then commits and updates the mempool atomically, then saves state. // NOTE: if Tendermint crashes before commit, some or all of these events may be published again.
func fireEvents(logger log.Logger, eventBus types.BlockEventPublisher, block *types.Block, abciResponses *ABCIResponses) {
// ApplyBlock validates the block against the state, executes it against the app, // NOTE: do we still need this buffer ?
// commits it, and saves the block and state. It's the only function that needs to be called txEventBuffer := types.NewTxEventBuffer(eventBus, int(block.NumTxs))
// from outside this package to process and commit an entire block. for i, tx := range block.Data.Txs {
func (s *State) ApplyBlock(txEventPublisher types.TxEventPublisher, proxyAppConn proxy.AppConnConsensus, txEventBuffer.PublishEventTx(types.EventDataTx{types.TxResult{
block *types.Block, partsHeader types.PartSetHeader, Height: block.Height,
mempool types.Mempool, evpool types.EvidencePool) error { Index: uint32(i),
Tx: tx,
abciResponses, err := s.ValExecBlock(txEventPublisher, proxyAppConn, block) Result: *(abciResponses.DeliverTx[i]),
if err != nil { }})
return fmt.Errorf("Exec failed for application: %v", err)
} }
fail.Fail() // XXX eventBus.PublishEventNewBlock(types.EventDataNewBlock{block})
eventBus.PublishEventNewBlockHeader(types.EventDataNewBlockHeader{block.Header})
// save the results before we commit err := txEventBuffer.Flush()
s.SaveABCIResponses(block.Height, abciResponses)
fail.Fail() // XXX
// now update the block and validators
err = s.SetBlockAndValidators(block.Header, partsHeader, abciResponses)
if err != nil { if err != nil {
return fmt.Errorf("Commit failed for application: %v", err) logger.Error("Failed to flush event buffer", "err", err)
} }
// lock mempool, commit state, update mempoool
err = s.CommitStateUpdateMempool(proxyAppConn, block, mempool)
if err != nil {
return fmt.Errorf("Commit failed for application: %v", err)
}
fail.Fail() // XXX
evpool.MarkEvidenceAsCommitted(block.Evidence.Evidence)
// save the state and the validators
s.Save()
return nil
} }
// CommitStateUpdateMempool locks the mempool, runs the ABCI Commit message, and updates the mempool. //----------------------------------------------------------------------------------------------------
// The Mempool must be locked during commit and update because state is typically reset on Commit and old txs must be replayed // Execute block without state. TODO: eliminate
// against committed state before new txs are run in the mempool, lest they be invalid.
func (s *State) CommitStateUpdateMempool(proxyAppConn proxy.AppConnConsensus, block *types.Block, mempool types.Mempool) error {
mempool.Lock()
defer mempool.Unlock()
// Commit block, get hash back
res, err := proxyAppConn.CommitSync()
if err != nil {
s.logger.Error("Client error during proxyAppConn.CommitSync", "err", err)
return err
}
if res.IsErr() {
s.logger.Error("Error in proxyAppConn.CommitSync", "err", res)
return res
}
if res.Log != "" {
s.logger.Debug("Commit.Log: " + res.Log)
}
s.logger.Info("Committed state", "height", block.Height, "txs", block.NumTxs, "hash", res.Data)
// Set the state's new AppHash
s.AppHash = res.Data
// Update mempool.
return mempool.Update(block.Height, block.Txs)
}
// ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state. // ExecCommitBlock executes and commits a block on the proxyApp without validating or mutating the state.
// It returns the application root hash (result of abci.Commit). // It returns the application root hash (result of abci.Commit).
func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger, lastValidators *types.ValidatorSet) ([]byte, error) { func ExecCommitBlock(appConnConsensus proxy.AppConnConsensus, block *types.Block, logger log.Logger) ([]byte, error) {
_, err := execBlockOnProxyApp(types.NopEventBus{}, appConnConsensus, block, logger, lastValidators) _, err := execBlockOnProxyApp(logger, appConnConsensus, block)
if err != nil { if err != nil {
logger.Error("Error executing block on proxy app", "height", block.Height, "err", err) logger.Error("Error executing block on proxy app", "height", block.Height, "err", err)
return nil, err return nil, err

View File

@ -23,64 +23,6 @@ var (
nTxsPerBlock = 10 nTxsPerBlock = 10
) )
func TestValidateBlock(t *testing.T) {
state := state()
state.SetLogger(log.TestingLogger())
// proper block must pass
block := makeBlock(state, 1)
err := state.ValidateBlock(block)
require.NoError(t, err)
// wrong chain fails
block = makeBlock(state, 1)
block.ChainID = "not-the-real-one"
err = state.ValidateBlock(block)
require.Error(t, err)
// wrong height fails
block = makeBlock(state, 1)
block.Height += 10
err = state.ValidateBlock(block)
require.Error(t, err)
// wrong total tx fails
block = makeBlock(state, 1)
block.TotalTxs += 10
err = state.ValidateBlock(block)
require.Error(t, err)
// wrong blockid fails
block = makeBlock(state, 1)
block.LastBlockID.PartsHeader.Total += 10
err = state.ValidateBlock(block)
require.Error(t, err)
// wrong app hash fails
block = makeBlock(state, 1)
block.AppHash = []byte("wrong app hash")
err = state.ValidateBlock(block)
require.Error(t, err)
// wrong consensus hash fails
block = makeBlock(state, 1)
block.ConsensusHash = []byte("wrong consensus hash")
err = state.ValidateBlock(block)
require.Error(t, err)
// wrong results hash fails
block = makeBlock(state, 1)
block.LastResultsHash = []byte("wrong results hash")
err = state.ValidateBlock(block)
require.Error(t, err)
// wrong validators hash fails
block = makeBlock(state, 1)
block.ValidatorsHash = []byte("wrong validators hash")
err = state.ValidateBlock(block)
require.Error(t, err)
}
func TestApplyBlock(t *testing.T) { func TestApplyBlock(t *testing.T) {
cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication()) cc := proxy.NewLocalClientCreator(dummy.NewDummyApplication())
proxyApp := proxy.NewAppConns(cc, nil) proxyApp := proxy.NewAppConns(cc, nil)
@ -88,15 +30,15 @@ func TestApplyBlock(t *testing.T) {
require.Nil(t, err) require.Nil(t, err)
defer proxyApp.Stop() defer proxyApp.Stop()
state := state() state, stateDB := state(), dbm.NewMemDB()
state.SetLogger(log.TestingLogger())
block := makeBlock(state, 1) blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
err = state.ApplyBlock(types.NopEventBus{}, proxyApp.Consensus(),
block, block.MakePartSet(testPartSize).Header(),
types.MockMempool{}, types.MockEvidencePool{}) types.MockMempool{}, types.MockEvidencePool{})
block := makeBlock(state, 1)
blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()}
state, err = blockExec.ApplyBlock(state, blockID, block)
require.Nil(t, err) require.Nil(t, err)
// TODO check state and mempool // TODO check state and mempool
@ -112,15 +54,6 @@ func TestBeginBlockAbsentValidators(t *testing.T) {
defer proxyApp.Stop() defer proxyApp.Stop()
state := state() state := state()
state.SetLogger(log.TestingLogger())
// there were 2 validators
val1PrivKey := crypto.GenPrivKeyEd25519()
val2PrivKey := crypto.GenPrivKeyEd25519()
lastValidators := types.NewValidatorSet([]*types.Validator{
types.NewValidator(val1PrivKey.PubKey(), 10),
types.NewValidator(val2PrivKey.PubKey(), 5),
})
prevHash := state.LastBlockID.Hash prevHash := state.LastBlockID.Hash
prevParts := types.PartSetHeader{} prevParts := types.PartSetHeader{}
@ -141,7 +74,7 @@ func TestBeginBlockAbsentValidators(t *testing.T) {
lastCommit := &types.Commit{BlockID: prevBlockID, Precommits: tc.lastCommitPrecommits} lastCommit := &types.Commit{BlockID: prevBlockID, Precommits: tc.lastCommitPrecommits}
block, _ := state.MakeBlock(2, makeTxs(2), lastCommit) block, _ := state.MakeBlock(2, makeTxs(2), lastCommit)
_, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger(), lastValidators) _, err = ExecCommitBlock(proxyApp.Consensus(), block, log.TestingLogger())
require.Nil(t, err, tc.desc) require.Nil(t, err, tc.desc)
// -> app must receive an index of the absent validator // -> app must receive an index of the absent validator
@ -159,8 +92,8 @@ func makeTxs(height int64) (txs []types.Tx) {
return txs return txs
} }
func state() *State { func state() State {
s, _ := MakeGenesisState(dbm.NewMemDB(), &types.GenesisDoc{ s, _ := MakeGenesisState(&types.GenesisDoc{
ChainID: chainID, ChainID: chainID,
Validators: []types.GenesisValidator{ Validators: []types.GenesisValidator{
{privKey.PubKey(), 10000, "test"}, {privKey.PubKey(), 10000, "test"},
@ -170,7 +103,7 @@ func state() *State {
return s return s
} }
func makeBlock(state *State, height int64) *types.Block { func makeBlock(state State, height int64) *types.Block {
block, _ := state.MakeBlock(height, makeTxs(state.LastBlockHeight), new(types.Commit)) block, _ := state.MakeBlock(height, makeTxs(state.LastBlockHeight), new(types.Commit))
return block return block
} }

View File

@ -4,15 +4,8 @@ import (
"bytes" "bytes"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"sync"
"time" "time"
abci "github.com/tendermint/abci/types"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
wire "github.com/tendermint/go-wire" wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
@ -23,36 +16,19 @@ var (
stateKey = []byte("stateKey") stateKey = []byte("stateKey")
) )
func calcValidatorsKey(height int64) []byte {
return []byte(cmn.Fmt("validatorsKey:%v", height))
}
func calcConsensusParamsKey(height int64) []byte {
return []byte(cmn.Fmt("consensusParamsKey:%v", height))
}
func calcABCIResponsesKey(height int64) []byte {
return []byte(cmn.Fmt("abciResponsesKey:%v", height))
}
//----------------------------------------------------------------------------- //-----------------------------------------------------------------------------
// State is a short description of the latest committed block of the Tendermint consensus. // State is a short description of the latest committed block of the Tendermint consensus.
// It keeps all information necessary to validate new blocks, // It keeps all information necessary to validate new blocks,
// including the last validator set and the consensus params. // including the last validator set and the consensus params.
// All fields are exposed so the struct can be easily serialized, // All fields are exposed so the struct can be easily serialized,
// but the fields should only be changed by calling state.SetBlockAndValidators. // but none of them should be mutated directly.
// Instead, use state.Copy() or state.NextState(...).
// NOTE: not goroutine-safe. // NOTE: not goroutine-safe.
type State struct { type State struct {
// mtx for writing to db
mtx sync.Mutex
db dbm.DB
// Immutable // Immutable
ChainID string ChainID string
// Exposed fields are updated by SetBlockAndValidators.
// LastBlockHeight=0 at genesis (ie. block(H=0) does not exist) // LastBlockHeight=0 at genesis (ie. block(H=0) does not exist)
LastBlockHeight int64 LastBlockHeight int64
LastBlockTotalTx int64 LastBlockTotalTx int64
@ -78,61 +54,11 @@ type State struct {
// The latest AppHash we've received from calling abci.Commit() // The latest AppHash we've received from calling abci.Commit()
AppHash []byte AppHash []byte
logger log.Logger
}
// GetState loads the most recent state from the database,
// or creates a new one from the given genesisFile and persists the result
// to the database.
func GetState(stateDB dbm.DB, genesisFile string) (*State, error) {
state := LoadState(stateDB)
if state == nil {
var err error
state, err = MakeGenesisStateFromFile(stateDB, genesisFile)
if err != nil {
return nil, err
}
state.Save()
}
return state, nil
}
// LoadState loads the State from the database.
func LoadState(db dbm.DB) *State {
return loadState(db, stateKey)
}
func loadState(db dbm.DB, key []byte) *State {
buf := db.Get(key)
if len(buf) == 0 {
return nil
}
s := &State{db: db}
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(&s, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadState: Data has been corrupted or its spec has changed:
%v\n`, *err))
}
// TODO: ensure that buf is completely read.
return s
}
// SetLogger sets the logger on the State.
func (s *State) SetLogger(l log.Logger) {
s.logger = l
} }
// Copy makes a copy of the State for mutating. // Copy makes a copy of the State for mutating.
func (s *State) Copy() *State { func (s State) Copy() State {
return &State{ return State{
db: s.db,
ChainID: s.ChainID, ChainID: s.ChainID,
LastBlockHeight: s.LastBlockHeight, LastBlockHeight: s.LastBlockHeight,
@ -150,327 +76,47 @@ func (s *State) Copy() *State {
AppHash: s.AppHash, AppHash: s.AppHash,
LastResultsHash: s.LastResultsHash, LastResultsHash: s.LastResultsHash,
logger: s.logger,
} }
} }
// Save persists the State to the database.
func (s *State) Save() {
s.mtx.Lock()
defer s.mtx.Unlock()
s.saveValidatorsInfo()
s.saveConsensusParamsInfo()
s.db.SetSync(stateKey, s.Bytes())
}
// SaveABCIResponses persists the ABCIResponses to the database.
// This is useful in case we crash after app.Commit and before s.Save().
// Responses are indexed by height so they can also be loaded later to produce Merkle proofs.
func (s *State) SaveABCIResponses(height int64, abciResponses *ABCIResponses) {
s.db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes())
}
// LoadABCIResponses loads the ABCIResponses for the given height from the database.
// This is useful for recovering from crashes where we called app.Commit and before we called
// s.Save(). It can also be used to produce Merkle proofs of the result of txs.
func (s *State) LoadABCIResponses(height int64) (*ABCIResponses, error) {
buf := s.db.Get(calcABCIResponsesKey(height))
if len(buf) == 0 {
return nil, ErrNoABCIResponsesForHeight{height}
}
abciResponses := new(ABCIResponses)
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(abciResponses, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadABCIResponses: Data has been corrupted or its spec has
changed: %v\n`, *err))
}
// TODO: ensure that buf is completely read.
return abciResponses, nil
}
// LoadValidators loads the ValidatorSet for a given height.
// Returns ErrNoValSetForHeight if the validator set can't be found for this height.
func (s *State) LoadValidators(height int64) (*types.ValidatorSet, error) {
valInfo := s.loadValidatorsInfo(height)
if valInfo == nil {
return nil, ErrNoValSetForHeight{height}
}
if valInfo.ValidatorSet == nil {
valInfo = s.loadValidatorsInfo(valInfo.LastHeightChanged)
if valInfo == nil {
cmn.PanicSanity(fmt.Sprintf(`Couldn't find validators at height %d as
last changed from height %d`, valInfo.LastHeightChanged, height))
}
}
return valInfo.ValidatorSet, nil
}
func (s *State) loadValidatorsInfo(height int64) *ValidatorsInfo {
buf := s.db.Get(calcValidatorsKey(height))
if len(buf) == 0 {
return nil
}
v := new(ValidatorsInfo)
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(v, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadValidators: Data has been corrupted or its spec has changed:
%v\n`, *err))
}
// TODO: ensure that buf is completely read.
return v
}
// saveValidatorsInfo persists the validator set for the next block to disk.
// It should be called from s.Save(), right before the state itself is persisted.
// If the validator set did not change after processing the latest block,
// only the last height for which the validators changed is persisted.
func (s *State) saveValidatorsInfo() {
changeHeight := s.LastHeightValidatorsChanged
nextHeight := s.LastBlockHeight + 1
valInfo := &ValidatorsInfo{
LastHeightChanged: changeHeight,
}
if changeHeight == nextHeight {
valInfo.ValidatorSet = s.Validators
}
s.db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes())
}
// LoadConsensusParams loads the ConsensusParams for a given height.
func (s *State) LoadConsensusParams(height int64) (types.ConsensusParams, error) {
empty := types.ConsensusParams{}
paramsInfo := s.loadConsensusParamsInfo(height)
if paramsInfo == nil {
return empty, ErrNoConsensusParamsForHeight{height}
}
if paramsInfo.ConsensusParams == empty {
paramsInfo = s.loadConsensusParamsInfo(paramsInfo.LastHeightChanged)
if paramsInfo == nil {
cmn.PanicSanity(fmt.Sprintf(`Couldn't find consensus params at height %d as
last changed from height %d`, paramsInfo.LastHeightChanged, height))
}
}
return paramsInfo.ConsensusParams, nil
}
func (s *State) loadConsensusParamsInfo(height int64) *ConsensusParamsInfo {
buf := s.db.Get(calcConsensusParamsKey(height))
if len(buf) == 0 {
return nil
}
paramsInfo := new(ConsensusParamsInfo)
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(paramsInfo, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadConsensusParams: Data has been corrupted or its spec has changed:
%v\n`, *err))
}
// TODO: ensure that buf is completely read.
return paramsInfo
}
// saveConsensusParamsInfo persists the consensus params for the next block to disk.
// It should be called from s.Save(), right before the state itself is persisted.
// If the consensus params did not change after processing the latest block,
// only the last height for which they changed is persisted.
func (s *State) saveConsensusParamsInfo() {
changeHeight := s.LastHeightConsensusParamsChanged
nextHeight := s.LastBlockHeight + 1
paramsInfo := &ConsensusParamsInfo{
LastHeightChanged: changeHeight,
}
if changeHeight == nextHeight {
paramsInfo.ConsensusParams = s.ConsensusParams
}
s.db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes())
}
// Equals returns true if the States are identical. // Equals returns true if the States are identical.
func (s *State) Equals(s2 *State) bool { func (s State) Equals(s2 State) bool {
return bytes.Equal(s.Bytes(), s2.Bytes()) return bytes.Equal(s.Bytes(), s2.Bytes())
} }
// Bytes serializes the State using go-wire. // Bytes serializes the State using go-wire.
func (s *State) Bytes() []byte { func (s State) Bytes() []byte {
return wire.BinaryBytes(s) return wire.BinaryBytes(s)
} }
// SetBlockAndValidators mutates State variables // IsEmpty returns true if the State is equal to the empty State.
// to update block and validators after running EndBlock. func (s State) IsEmpty() bool {
func (s *State) SetBlockAndValidators(header *types.Header, blockPartsHeader types.PartSetHeader, return s.Validators == nil // XXX can't compare to Empty
abciResponses *ABCIResponses) error {
// copy the valset so we can apply changes from EndBlock
// and update s.LastValidators and s.Validators
prevValSet := s.Validators.Copy()
nextValSet := prevValSet.Copy()
// update the validator set with the latest abciResponses
if len(abciResponses.EndBlock.ValidatorUpdates) > 0 {
err := updateValidators(nextValSet, abciResponses.EndBlock.ValidatorUpdates)
if err != nil {
return fmt.Errorf("Error changing validator set: %v", err)
}
// change results from this height but only applies to the next height
s.LastHeightValidatorsChanged = header.Height + 1
}
// Update validator accums and set state variables
nextValSet.IncrementAccum(1)
// update the params with the latest abciResponses
nextParams := s.ConsensusParams
if abciResponses.EndBlock.ConsensusParamUpdates != nil {
// NOTE: must not mutate s.ConsensusParams
nextParams = s.ConsensusParams.Update(abciResponses.EndBlock.ConsensusParamUpdates)
err := nextParams.Validate()
if err != nil {
return fmt.Errorf("Error updating consensus params: %v", err)
}
// change results from this height but only applies to the next height
s.LastHeightConsensusParamsChanged = header.Height + 1
}
s.setBlockAndValidators(header.Height,
header.NumTxs,
types.BlockID{header.Hash(), blockPartsHeader},
header.Time,
nextValSet,
nextParams,
abciResponses.ResultsHash())
return nil
}
func (s *State) setBlockAndValidators(height int64,
newTxs int64, blockID types.BlockID, blockTime time.Time,
valSet *types.ValidatorSet,
params types.ConsensusParams,
resultsHash []byte) {
s.LastBlockHeight = height
s.LastBlockTotalTx += newTxs
s.LastBlockID = blockID
s.LastBlockTime = blockTime
s.LastValidators = s.Validators.Copy()
s.Validators = valSet
s.ConsensusParams = params
s.LastResultsHash = resultsHash
} }
// GetValidators returns the last and current validator sets. // GetValidators returns the last and current validator sets.
func (s *State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) { func (s State) GetValidators() (last *types.ValidatorSet, current *types.ValidatorSet) {
return s.LastValidators, s.Validators return s.LastValidators, s.Validators
} }
// VerifyEvidence verifies the evidence fully by checking it is internally
// consistent and corresponds to an existing or previous validator.
// It returns the priority of this evidence, or an error.
// NOTE: return error may be ErrNoValSetForHeight, in which case the validator set
// for the evidence height could not be loaded.
func (s *State) VerifyEvidence(evidence types.Evidence) (priority int64, err error) {
evidenceAge := s.LastBlockHeight - evidence.Height()
maxAge := s.ConsensusParams.EvidenceParams.MaxAge
if evidenceAge > maxAge {
return priority, fmt.Errorf("Evidence from height %d is too old. Min height is %d",
evidence.Height(), s.LastBlockHeight-maxAge)
}
if err := evidence.Verify(s.ChainID); err != nil {
return priority, err
}
// The address must have been an active validator at the height
ev := evidence
height, addr, idx := ev.Height(), ev.Address(), ev.Index()
valset, err := s.LoadValidators(height)
if err != nil {
// XXX/TODO: what do we do if we can't load the valset?
// eg. if we have pruned the state or height is too high?
return priority, err
}
valIdx, val := valset.GetByAddress(addr)
if val == nil {
return priority, fmt.Errorf("Address %X was not a validator at height %d", addr, height)
} else if idx != valIdx {
return priority, fmt.Errorf("Address %X was validator %d at height %d, not %d", addr, valIdx, height, idx)
}
priority = val.VotingPower
return priority, nil
}
//------------------------------------------------------------------------ //------------------------------------------------------------------------
// Create a block from the latest state
// ABCIResponses retains the responses // MakeBlock builds a block with the given txs and commit from the current state.
// of the various ABCI calls during block processing. func (s State) MakeBlock(height int64, txs []types.Tx, commit *types.Commit) (*types.Block, *types.PartSet) {
// It is persisted to disk for each height before calling Commit. // build base block
type ABCIResponses struct { block := types.MakeBlock(height, txs, commit)
DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock
}
// NewABCIResponses returns a new ABCIResponses // fill header with state data
func NewABCIResponses(block *types.Block) *ABCIResponses { block.ChainID = s.ChainID
return &ABCIResponses{ block.TotalTxs = s.LastBlockTotalTx + block.NumTxs
DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs), block.LastBlockID = s.LastBlockID
} block.ValidatorsHash = s.Validators.Hash()
} block.AppHash = s.AppHash
block.ConsensusHash = s.ConsensusParams.Hash()
block.LastResultsHash = s.LastResultsHash
// Bytes serializes the ABCIResponse using go-wire return block, block.MakePartSet(s.ConsensusParams.BlockGossip.BlockPartSizeBytes)
func (a *ABCIResponses) Bytes() []byte {
return wire.BinaryBytes(*a)
}
func (a *ABCIResponses) ResultsHash() []byte {
results := types.NewResults(a.DeliverTx)
return results.Hash()
}
//-----------------------------------------------------------------------------
// ValidatorsInfo represents the latest validator set, or the last height it changed
type ValidatorsInfo struct {
ValidatorSet *types.ValidatorSet
LastHeightChanged int64
}
// Bytes serializes the ValidatorsInfo using go-wire
func (valInfo *ValidatorsInfo) Bytes() []byte {
return wire.BinaryBytes(*valInfo)
}
//-----------------------------------------------------------------------------
// ConsensusParamsInfo represents the latest consensus params, or the last height it changed
type ConsensusParamsInfo struct {
ConsensusParams types.ConsensusParams
LastHeightChanged int64
}
// Bytes serializes the ConsensusParamsInfo using go-wire
func (params ConsensusParamsInfo) Bytes() []byte {
return wire.BinaryBytes(params)
} }
//------------------------------------------------------------------------ //------------------------------------------------------------------------
@ -480,12 +126,12 @@ func (params ConsensusParamsInfo) Bytes() []byte {
// file. // file.
// //
// Used during replay and in tests. // Used during replay and in tests.
func MakeGenesisStateFromFile(db dbm.DB, genDocFile string) (*State, error) { func MakeGenesisStateFromFile(genDocFile string) (State, error) {
genDoc, err := MakeGenesisDocFromFile(genDocFile) genDoc, err := MakeGenesisDocFromFile(genDocFile)
if err != nil { if err != nil {
return nil, err return State{}, err
} }
return MakeGenesisState(db, genDoc) return MakeGenesisState(genDoc)
} }
// MakeGenesisDocFromFile reads and unmarshals genesis doc from the given file. // MakeGenesisDocFromFile reads and unmarshals genesis doc from the given file.
@ -502,10 +148,10 @@ func MakeGenesisDocFromFile(genDocFile string) (*types.GenesisDoc, error) {
} }
// MakeGenesisState creates state from types.GenesisDoc. // MakeGenesisState creates state from types.GenesisDoc.
func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) { func MakeGenesisState(genDoc *types.GenesisDoc) (State, error) {
err := genDoc.ValidateAndComplete() err := genDoc.ValidateAndComplete()
if err != nil { if err != nil {
return nil, fmt.Errorf("Error in genesis file: %v", err) return State{}, fmt.Errorf("Error in genesis file: %v", err)
} }
// Make validators slice // Make validators slice
@ -522,8 +168,7 @@ func MakeGenesisState(db dbm.DB, genDoc *types.GenesisDoc) (*State, error) {
} }
} }
return &State{ return State{
db: db,
ChainID: genDoc.ChainID, ChainID: genDoc.ChainID,

View File

@ -14,19 +14,17 @@ import (
cmn "github.com/tendermint/tmlibs/common" cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db" dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
cfg "github.com/tendermint/tendermint/config" cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/types" "github.com/tendermint/tendermint/types"
) )
// setupTestCase does setup common to all test cases // setupTestCase does setup common to all test cases
func setupTestCase(t *testing.T) (func(t *testing.T), dbm.DB, *State) { func setupTestCase(t *testing.T) (func(t *testing.T), dbm.DB, State) {
config := cfg.ResetTestRoot("state_") config := cfg.ResetTestRoot("state_")
stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir()) stateDB := dbm.NewDB("state", config.DBBackend, config.DBDir())
state, err := GetState(stateDB, config.GenesisFile()) state, err := LoadStateFromDBOrGenesisFile(stateDB, config.GenesisFile())
assert.NoError(t, err, "expected no error on GetState") assert.NoError(t, err, "expected no error on LoadStateFromDBOrGenesisFile")
state.SetLogger(log.TestingLogger())
tearDown := func(t *testing.T) {} tearDown := func(t *testing.T) {}
@ -59,7 +57,7 @@ func TestStateSaveLoad(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
state.LastBlockHeight++ state.LastBlockHeight++
state.Save() SaveState(stateDB, state)
loadedState := LoadState(stateDB) loadedState := LoadState(stateDB)
assert.True(state.Equals(loadedState), assert.True(state.Equals(loadedState),
@ -69,7 +67,7 @@ func TestStateSaveLoad(t *testing.T) {
// TestABCIResponsesSaveLoad tests saving and loading ABCIResponses. // TestABCIResponsesSaveLoad tests saving and loading ABCIResponses.
func TestABCIResponsesSaveLoad1(t *testing.T) { func TestABCIResponsesSaveLoad1(t *testing.T) {
tearDown, _, state := setupTestCase(t) tearDown, stateDB, state := setupTestCase(t)
defer tearDown(t) defer tearDown(t)
// nolint: vetshadow // nolint: vetshadow
assert := assert.New(t) assert := assert.New(t)
@ -88,8 +86,8 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
}, },
}} }}
state.SaveABCIResponses(block.Height, abciResponses) saveABCIResponses(stateDB, block.Height, abciResponses)
loadedAbciResponses, err := state.LoadABCIResponses(block.Height) loadedAbciResponses, err := LoadABCIResponses(stateDB, block.Height)
assert.Nil(err) assert.Nil(err)
assert.Equal(abciResponses, loadedAbciResponses, assert.Equal(abciResponses, loadedAbciResponses,
cmn.Fmt(`ABCIResponses don't match: Got %v, Expected %v`, loadedAbciResponses, cmn.Fmt(`ABCIResponses don't match: Got %v, Expected %v`, loadedAbciResponses,
@ -98,7 +96,7 @@ func TestABCIResponsesSaveLoad1(t *testing.T) {
// TestResultsSaveLoad tests saving and loading abci results. // TestResultsSaveLoad tests saving and loading abci results.
func TestABCIResponsesSaveLoad2(t *testing.T) { func TestABCIResponsesSaveLoad2(t *testing.T) {
tearDown, _, state := setupTestCase(t) tearDown, stateDB, _ := setupTestCase(t)
defer tearDown(t) defer tearDown(t)
// nolint: vetshadow // nolint: vetshadow
assert := assert.New(t) assert := assert.New(t)
@ -142,7 +140,7 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
// query all before, should return error // query all before, should return error
for i := range cases { for i := range cases {
h := int64(i + 1) h := int64(i + 1)
res, err := state.LoadABCIResponses(h) res, err := LoadABCIResponses(stateDB, h)
assert.Error(err, "%d: %#v", i, res) assert.Error(err, "%d: %#v", i, res)
} }
@ -153,13 +151,13 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
DeliverTx: tc.added, DeliverTx: tc.added,
EndBlock: &abci.ResponseEndBlock{}, EndBlock: &abci.ResponseEndBlock{},
} }
state.SaveABCIResponses(h, responses) saveABCIResponses(stateDB, h, responses)
} }
// query all before, should return expected value // query all before, should return expected value
for i, tc := range cases { for i, tc := range cases {
h := int64(i + 1) h := int64(i + 1)
res, err := state.LoadABCIResponses(h) res, err := LoadABCIResponses(stateDB, h)
assert.NoError(err, "%d", i) assert.NoError(err, "%d", i)
assert.Equal(tc.expected.Hash(), res.ResultsHash(), "%d", i) assert.Equal(tc.expected.Hash(), res.ResultsHash(), "%d", i)
} }
@ -167,54 +165,57 @@ func TestABCIResponsesSaveLoad2(t *testing.T) {
// TestValidatorSimpleSaveLoad tests saving and loading validators. // TestValidatorSimpleSaveLoad tests saving and loading validators.
func TestValidatorSimpleSaveLoad(t *testing.T) { func TestValidatorSimpleSaveLoad(t *testing.T) {
tearDown, _, state := setupTestCase(t) tearDown, stateDB, state := setupTestCase(t)
defer tearDown(t) defer tearDown(t)
// nolint: vetshadow // nolint: vetshadow
assert := assert.New(t) assert := assert.New(t)
// can't load anything for height 0 // can't load anything for height 0
v, err := state.LoadValidators(0) v, err := LoadValidators(stateDB, 0)
assert.IsType(ErrNoValSetForHeight{}, err, "expected err at height 0") assert.IsType(ErrNoValSetForHeight{}, err, "expected err at height 0")
// should be able to load for height 1 // should be able to load for height 1
v, err = state.LoadValidators(1) v, err = LoadValidators(stateDB, 1)
assert.Nil(err, "expected no err at height 1") assert.Nil(err, "expected no err at height 1")
assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match") assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match")
// increment height, save; should be able to load for next height // increment height, save; should be able to load for next height
state.LastBlockHeight++ state.LastBlockHeight++
state.saveValidatorsInfo() nextHeight := state.LastBlockHeight + 1
v, err = state.LoadValidators(state.LastBlockHeight + 1) saveValidatorsInfo(stateDB, nextHeight, state.LastHeightValidatorsChanged, state.Validators)
v, err = LoadValidators(stateDB, nextHeight)
assert.Nil(err, "expected no err") assert.Nil(err, "expected no err")
assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match") assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match")
// increment height, save; should be able to load for next height // increment height, save; should be able to load for next height
state.LastBlockHeight += 10 state.LastBlockHeight += 10
state.saveValidatorsInfo() nextHeight = state.LastBlockHeight + 1
v, err = state.LoadValidators(state.LastBlockHeight + 1) saveValidatorsInfo(stateDB, nextHeight, state.LastHeightValidatorsChanged, state.Validators)
v, err = LoadValidators(stateDB, nextHeight)
assert.Nil(err, "expected no err") assert.Nil(err, "expected no err")
assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match") assert.Equal(v.Hash(), state.Validators.Hash(), "expected validator hashes to match")
// should be able to load for next next height // should be able to load for next next height
_, err = state.LoadValidators(state.LastBlockHeight + 2) _, err = LoadValidators(stateDB, state.LastBlockHeight+2)
assert.IsType(ErrNoValSetForHeight{}, err, "expected err at unknown height") assert.IsType(ErrNoValSetForHeight{}, err, "expected err at unknown height")
} }
// TestValidatorChangesSaveLoad tests saving and loading a validator set with changes. // TestValidatorChangesSaveLoad tests saving and loading a validator set with changes.
func TestOneValidatorChangesSaveLoad(t *testing.T) { func TestOneValidatorChangesSaveLoad(t *testing.T) {
tearDown, _, state := setupTestCase(t) tearDown, stateDB, state := setupTestCase(t)
defer tearDown(t) defer tearDown(t)
// change vals at these heights // change vals at these heights
changeHeights := []int64{1, 2, 4, 5, 10, 15, 16, 17, 20} changeHeights := []int64{1, 2, 4, 5, 10, 15, 16, 17, 20}
N := len(changeHeights) N := len(changeHeights)
// build the validator history by running SetBlockAndValidators // build the validator history by running updateState
// with the right validator set for each height // with the right validator set for each height
highestHeight := changeHeights[N-1] + 5 highestHeight := changeHeights[N-1] + 5
changeIndex := 0 changeIndex := 0
_, val := state.Validators.GetByIndex(0) _, val := state.Validators.GetByIndex(0)
power := val.VotingPower power := val.VotingPower
var err error
for i := int64(1); i < highestHeight; i++ { for i := int64(1); i < highestHeight; i++ {
// when we get to a change height, // when we get to a change height,
// use the next pubkey // use the next pubkey
@ -222,10 +223,11 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) {
changeIndex++ changeIndex++
power += 1 power += 1
} }
header, parts, responses := makeHeaderPartsResponsesValPowerChange(state, i, power) header, blockID, responses := makeHeaderPartsResponsesValPowerChange(state, i, power)
err := state.SetBlockAndValidators(header, parts, responses) state, err = updateState(state, blockID, header, responses)
assert.Nil(t, err) assert.Nil(t, err)
state.saveValidatorsInfo() nextHeight := state.LastBlockHeight + 1
saveValidatorsInfo(stateDB, nextHeight, state.LastHeightValidatorsChanged, state.Validators)
} }
// on each change height, increment the power by one. // on each change height, increment the power by one.
@ -243,7 +245,7 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) {
} }
for i, power := range testCases { for i, power := range testCases {
v, err := state.LoadValidators(int64(i + 1)) v, err := LoadValidators(stateDB, int64(i+1))
assert.Nil(t, err, fmt.Sprintf("expected no err at height %d", i)) assert.Nil(t, err, fmt.Sprintf("expected no err at height %d", i))
assert.Equal(t, v.Size(), 1, "validator set size is greater than 1: %d", v.Size()) assert.Equal(t, v.Size(), 1, "validator set size is greater than 1: %d", v.Size())
_, val := v.GetByIndex(0) _, val := v.GetByIndex(0)
@ -257,20 +259,22 @@ func TestOneValidatorChangesSaveLoad(t *testing.T) {
// changes. // changes.
func TestManyValidatorChangesSaveLoad(t *testing.T) { func TestManyValidatorChangesSaveLoad(t *testing.T) {
const valSetSize = 7 const valSetSize = 7
tearDown, _, state := setupTestCase(t) tearDown, stateDB, state := setupTestCase(t)
state.Validators = genValSet(valSetSize) state.Validators = genValSet(valSetSize)
state.Save() SaveState(stateDB, state)
defer tearDown(t) defer tearDown(t)
const height = 1 const height = 1
pubkey := crypto.GenPrivKeyEd25519().PubKey() pubkey := crypto.GenPrivKeyEd25519().PubKey()
// swap the first validator with a new one ^^^ (validator set size stays the same) // swap the first validator with a new one ^^^ (validator set size stays the same)
header, parts, responses := makeHeaderPartsResponsesValPubKeyChange(state, height, pubkey) header, blockID, responses := makeHeaderPartsResponsesValPubKeyChange(state, height, pubkey)
err := state.SetBlockAndValidators(header, parts, responses) var err error
state, err = updateState(state, blockID, header, responses)
require.Nil(t, err) require.Nil(t, err)
state.saveValidatorsInfo() nextHeight := state.LastBlockHeight + 1
saveValidatorsInfo(stateDB, nextHeight, state.LastHeightValidatorsChanged, state.Validators)
v, err := state.LoadValidators(height + 1) v, err := LoadValidators(stateDB, height+1)
assert.Nil(t, err) assert.Nil(t, err)
assert.Equal(t, valSetSize, v.Size()) assert.Equal(t, valSetSize, v.Size())
@ -292,7 +296,7 @@ func genValSet(size int) *types.ValidatorSet {
// TestConsensusParamsChangesSaveLoad tests saving and loading consensus params // TestConsensusParamsChangesSaveLoad tests saving and loading consensus params
// with changes. // with changes.
func TestConsensusParamsChangesSaveLoad(t *testing.T) { func TestConsensusParamsChangesSaveLoad(t *testing.T) {
tearDown, _, state := setupTestCase(t) tearDown, stateDB, state := setupTestCase(t)
defer tearDown(t) defer tearDown(t)
// change vals at these heights // change vals at these heights
@ -308,11 +312,12 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) {
params[i].BlockSize.MaxBytes += i params[i].BlockSize.MaxBytes += i
} }
// build the params history by running SetBlockAndValidators // build the params history by running updateState
// with the right params set for each height // with the right params set for each height
highestHeight := changeHeights[N-1] + 5 highestHeight := changeHeights[N-1] + 5
changeIndex := 0 changeIndex := 0
cp := params[changeIndex] cp := params[changeIndex]
var err error
for i := int64(1); i < highestHeight; i++ { for i := int64(1); i < highestHeight; i++ {
// when we get to a change height, // when we get to a change height,
// use the next params // use the next params
@ -320,10 +325,12 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) {
changeIndex++ changeIndex++
cp = params[changeIndex] cp = params[changeIndex]
} }
header, parts, responses := makeHeaderPartsResponsesParams(state, i, cp) header, blockID, responses := makeHeaderPartsResponsesParams(state, i, cp)
err := state.SetBlockAndValidators(header, parts, responses) state, err = updateState(state, blockID, header, responses)
require.Nil(t, err) require.Nil(t, err)
state.saveConsensusParamsInfo() nextHeight := state.LastBlockHeight + 1
saveConsensusParamsInfo(stateDB, nextHeight, state.LastHeightConsensusParamsChanged, state.ConsensusParams)
} }
// make all the test cases by using the same params until after the change // make all the test cases by using the same params until after the change
@ -341,7 +348,7 @@ func TestConsensusParamsChangesSaveLoad(t *testing.T) {
} }
for _, testCase := range testCases { for _, testCase := range testCases {
p, err := state.LoadConsensusParams(testCase.height) p, err := LoadConsensusParams(stateDB, testCase.height)
assert.Nil(t, err, fmt.Sprintf("expected no err at height %d", testCase.height)) assert.Nil(t, err, fmt.Sprintf("expected no err at height %d", testCase.height))
assert.Equal(t, testCase.params, p, fmt.Sprintf(`unexpected consensus params at assert.Equal(t, testCase.params, p, fmt.Sprintf(`unexpected consensus params at
height %d`, testCase.height)) height %d`, testCase.height))
@ -416,15 +423,15 @@ func TestLessThanOneThirdOfVotingPowerPerBlockEnforced(t *testing.T) {
} }
for i, tc := range testCases { for i, tc := range testCases {
tearDown, _, state := setupTestCase(t) tearDown, stateDB, state := setupTestCase(t)
state.Validators = genValSet(tc.initialValSetSize) state.Validators = genValSet(tc.initialValSetSize)
state.Save() SaveState(stateDB, state)
height := state.LastBlockHeight + 1 height := state.LastBlockHeight + 1
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
EndBlock: &abci.ResponseEndBlock{ValidatorUpdates: tc.valUpdatesFn(state.Validators)}, EndBlock: &abci.ResponseEndBlock{ValidatorUpdates: tc.valUpdatesFn(state.Validators)},
} }
err := state.SetBlockAndValidators(block.Header, types.PartSetHeader{}, abciResponses) state, err := updateState(state, types.BlockID{block.Hash(), types.PartSetHeader{}}, block.Header, abciResponses)
if tc.shouldErr { if tc.shouldErr {
assert.Error(t, err, "#%d", i) assert.Error(t, err, "#%d", i)
} else { } else {
@ -484,8 +491,8 @@ func TestApplyUpdates(t *testing.T) {
} }
} }
func makeHeaderPartsResponsesValPubKeyChange(state *State, height int64, func makeHeaderPartsResponsesValPubKeyChange(state State, height int64,
pubkey crypto.PubKey) (*types.Header, types.PartSetHeader, *ABCIResponses) { pubkey crypto.PubKey) (*types.Header, types.BlockID, *ABCIResponses) {
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
@ -503,11 +510,11 @@ func makeHeaderPartsResponsesValPubKeyChange(state *State, height int64,
} }
} }
return block.Header, types.PartSetHeader{}, abciResponses return block.Header, types.BlockID{block.Hash(), types.PartSetHeader{}}, abciResponses
} }
func makeHeaderPartsResponsesValPowerChange(state *State, height int64, func makeHeaderPartsResponsesValPowerChange(state State, height int64,
power int64) (*types.Header, types.PartSetHeader, *ABCIResponses) { power int64) (*types.Header, types.BlockID, *ABCIResponses) {
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
@ -524,17 +531,17 @@ func makeHeaderPartsResponsesValPowerChange(state *State, height int64,
} }
} }
return block.Header, types.PartSetHeader{}, abciResponses return block.Header, types.BlockID{block.Hash(), types.PartSetHeader{}}, abciResponses
} }
func makeHeaderPartsResponsesParams(state *State, height int64, func makeHeaderPartsResponsesParams(state State, height int64,
params types.ConsensusParams) (*types.Header, types.PartSetHeader, *ABCIResponses) { params types.ConsensusParams) (*types.Header, types.BlockID, *ABCIResponses) {
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
EndBlock: &abci.ResponseEndBlock{ConsensusParamUpdates: types.TM2PB.ConsensusParams(&params)}, EndBlock: &abci.ResponseEndBlock{ConsensusParamUpdates: types.TM2PB.ConsensusParams(&params)},
} }
return block.Header, types.PartSetHeader{}, abciResponses return block.Header, types.BlockID{block.Hash(), types.PartSetHeader{}}, abciResponses
} }
type paramsChangeTestCase struct { type paramsChangeTestCase struct {
@ -542,13 +549,13 @@ type paramsChangeTestCase struct {
params types.ConsensusParams params types.ConsensusParams
} }
func makeHeaderPartsResults(state *State, height int64, func makeHeaderPartsResults(state State, height int64,
results []*abci.ResponseDeliverTx) (*types.Header, types.PartSetHeader, *ABCIResponses) { results []*abci.ResponseDeliverTx) (*types.Header, types.BlockID, *ABCIResponses) {
block := makeBlock(state, height) block := makeBlock(state, height)
abciResponses := &ABCIResponses{ abciResponses := &ABCIResponses{
DeliverTx: results, DeliverTx: results,
EndBlock: &abci.ResponseEndBlock{}, EndBlock: &abci.ResponseEndBlock{},
} }
return block.Header, types.PartSetHeader{}, abciResponses return block.Header, types.BlockID{block.Hash(), types.PartSetHeader{}}, abciResponses
} }

282
state/store.go Normal file
View File

@ -0,0 +1,282 @@
package state
import (
"bytes"
"fmt"
abci "github.com/tendermint/abci/types"
wire "github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/types"
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
)
//------------------------------------------------------------------------
func calcValidatorsKey(height int64) []byte {
return []byte(cmn.Fmt("validatorsKey:%v", height))
}
func calcConsensusParamsKey(height int64) []byte {
return []byte(cmn.Fmt("consensusParamsKey:%v", height))
}
func calcABCIResponsesKey(height int64) []byte {
return []byte(cmn.Fmt("abciResponsesKey:%v", height))
}
// LoadStateFromDBOrGenesisFile loads the most recent state from the database,
// or creates a new one from the given genesisFilePath and persists the result
// to the database.
func LoadStateFromDBOrGenesisFile(stateDB dbm.DB, genesisFilePath string) (State, error) {
state := LoadState(stateDB)
if state.IsEmpty() {
var err error
state, err = MakeGenesisStateFromFile(genesisFilePath)
if err != nil {
return state, err
}
SaveState(stateDB, state)
}
return state, nil
}
// LoadStateFromDBOrGenesisDoc loads the most recent state from the database,
// or creates a new one from the given genesisDoc and persists the result
// to the database.
func LoadStateFromDBOrGenesisDoc(stateDB dbm.DB, genesisDoc *types.GenesisDoc) (State, error) {
state := LoadState(stateDB)
if state.IsEmpty() {
var err error
state, err = MakeGenesisState(genesisDoc)
if err != nil {
return state, err
}
SaveState(stateDB, state)
}
return state, nil
}
// LoadState loads the State from the database.
func LoadState(db dbm.DB) State {
return loadState(db, stateKey)
}
func loadState(db dbm.DB, key []byte) (state State) {
buf := db.Get(key)
if len(buf) == 0 {
return state
}
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(&state, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadState: Data has been corrupted or its spec has changed:
%v\n`, *err))
}
// TODO: ensure that buf is completely read.
return state
}
// SaveState persists the State, the ValidatorsInfo, and the ConsensusParamsInfo to the database.
func SaveState(db dbm.DB, s State) {
saveState(db, s, stateKey)
}
func saveState(db dbm.DB, s State, key []byte) {
nextHeight := s.LastBlockHeight + 1
saveValidatorsInfo(db, nextHeight, s.LastHeightValidatorsChanged, s.Validators)
saveConsensusParamsInfo(db, nextHeight, s.LastHeightConsensusParamsChanged, s.ConsensusParams)
db.SetSync(stateKey, s.Bytes())
}
//------------------------------------------------------------------------
// ABCIResponses retains the responses
// of the various ABCI calls during block processing.
// It is persisted to disk for each height before calling Commit.
type ABCIResponses struct {
DeliverTx []*abci.ResponseDeliverTx
EndBlock *abci.ResponseEndBlock
}
// NewABCIResponses returns a new ABCIResponses
func NewABCIResponses(block *types.Block) *ABCIResponses {
return &ABCIResponses{
DeliverTx: make([]*abci.ResponseDeliverTx, block.NumTxs),
}
}
// Bytes serializes the ABCIResponse using go-wire
func (a *ABCIResponses) Bytes() []byte {
return wire.BinaryBytes(*a)
}
func (a *ABCIResponses) ResultsHash() []byte {
results := types.NewResults(a.DeliverTx)
return results.Hash()
}
// LoadABCIResponses loads the ABCIResponses for the given height from the database.
// This is useful for recovering from crashes where we called app.Commit and before we called
// s.Save(). It can also be used to produce Merkle proofs of the result of txs.
func LoadABCIResponses(db dbm.DB, height int64) (*ABCIResponses, error) {
buf := db.Get(calcABCIResponsesKey(height))
if len(buf) == 0 {
return nil, ErrNoABCIResponsesForHeight{height}
}
abciResponses := new(ABCIResponses)
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(abciResponses, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadABCIResponses: Data has been corrupted or its spec has
changed: %v\n`, *err))
}
// TODO: ensure that buf is completely read.
return abciResponses, nil
}
// SaveABCIResponses persists the ABCIResponses to the database.
// This is useful in case we crash after app.Commit and before s.Save().
// Responses are indexed by height so they can also be loaded later to produce Merkle proofs.
func saveABCIResponses(db dbm.DB, height int64, abciResponses *ABCIResponses) {
db.SetSync(calcABCIResponsesKey(height), abciResponses.Bytes())
}
//-----------------------------------------------------------------------------
// ValidatorsInfo represents the latest validator set, or the last height it changed
type ValidatorsInfo struct {
ValidatorSet *types.ValidatorSet
LastHeightChanged int64
}
// Bytes serializes the ValidatorsInfo using go-wire
func (valInfo *ValidatorsInfo) Bytes() []byte {
return wire.BinaryBytes(*valInfo)
}
// LoadValidators loads the ValidatorSet for a given height.
// Returns ErrNoValSetForHeight if the validator set can't be found for this height.
func LoadValidators(db dbm.DB, height int64) (*types.ValidatorSet, error) {
valInfo := loadValidatorsInfo(db, height)
if valInfo == nil {
return nil, ErrNoValSetForHeight{height}
}
if valInfo.ValidatorSet == nil {
valInfo = loadValidatorsInfo(db, valInfo.LastHeightChanged)
if valInfo == nil {
cmn.PanicSanity(fmt.Sprintf(`Couldn't find validators at height %d as
last changed from height %d`, valInfo.LastHeightChanged, height))
}
}
return valInfo.ValidatorSet, nil
}
func loadValidatorsInfo(db dbm.DB, height int64) *ValidatorsInfo {
buf := db.Get(calcValidatorsKey(height))
if len(buf) == 0 {
return nil
}
v := new(ValidatorsInfo)
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(v, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadValidators: Data has been corrupted or its spec has changed:
%v\n`, *err))
}
// TODO: ensure that buf is completely read.
return v
}
// saveValidatorsInfo persists the validator set for the next block to disk.
// It should be called from s.Save(), right before the state itself is persisted.
// If the validator set did not change after processing the latest block,
// only the last height for which the validators changed is persisted.
func saveValidatorsInfo(db dbm.DB, nextHeight, changeHeight int64, valSet *types.ValidatorSet) {
valInfo := &ValidatorsInfo{
LastHeightChanged: changeHeight,
}
if changeHeight == nextHeight {
valInfo.ValidatorSet = valSet
}
db.SetSync(calcValidatorsKey(nextHeight), valInfo.Bytes())
}
//-----------------------------------------------------------------------------
// ConsensusParamsInfo represents the latest consensus params, or the last height it changed
type ConsensusParamsInfo struct {
ConsensusParams types.ConsensusParams
LastHeightChanged int64
}
// Bytes serializes the ConsensusParamsInfo using go-wire
func (params ConsensusParamsInfo) Bytes() []byte {
return wire.BinaryBytes(params)
}
// LoadConsensusParams loads the ConsensusParams for a given height.
func LoadConsensusParams(db dbm.DB, height int64) (types.ConsensusParams, error) {
empty := types.ConsensusParams{}
paramsInfo := loadConsensusParamsInfo(db, height)
if paramsInfo == nil {
return empty, ErrNoConsensusParamsForHeight{height}
}
if paramsInfo.ConsensusParams == empty {
paramsInfo = loadConsensusParamsInfo(db, paramsInfo.LastHeightChanged)
if paramsInfo == nil {
cmn.PanicSanity(fmt.Sprintf(`Couldn't find consensus params at height %d as
last changed from height %d`, paramsInfo.LastHeightChanged, height))
}
}
return paramsInfo.ConsensusParams, nil
}
func loadConsensusParamsInfo(db dbm.DB, height int64) *ConsensusParamsInfo {
buf := db.Get(calcConsensusParamsKey(height))
if len(buf) == 0 {
return nil
}
paramsInfo := new(ConsensusParamsInfo)
r, n, err := bytes.NewReader(buf), new(int), new(error)
wire.ReadBinaryPtr(paramsInfo, r, 0, n, err)
if *err != nil {
// DATA HAS BEEN CORRUPTED OR THE SPEC HAS CHANGED
cmn.Exit(cmn.Fmt(`LoadConsensusParams: Data has been corrupted or its spec has changed:
%v\n`, *err))
}
// TODO: ensure that buf is completely read.
return paramsInfo
}
// saveConsensusParamsInfo persists the consensus params for the next block to disk.
// It should be called from s.Save(), right before the state itself is persisted.
// If the consensus params did not change after processing the latest block,
// only the last height for which they changed is persisted.
func saveConsensusParamsInfo(db dbm.DB, nextHeight, changeHeight int64, params types.ConsensusParams) {
paramsInfo := &ConsensusParamsInfo{
LastHeightChanged: changeHeight,
}
if changeHeight == nextHeight {
paramsInfo.ConsensusParams = params
}
db.SetSync(calcConsensusParamsKey(nextHeight), paramsInfo.Bytes())
}

122
state/validation.go Normal file
View File

@ -0,0 +1,122 @@
package state
import (
"bytes"
"errors"
"fmt"
"github.com/tendermint/tendermint/types"
dbm "github.com/tendermint/tmlibs/db"
)
//-----------------------------------------------------
// Validate block
func validateBlock(stateDB dbm.DB, s State, b *types.Block) error {
// validate internal consistency
if err := b.ValidateBasic(); err != nil {
return err
}
// validate basic info
if b.ChainID != s.ChainID {
return fmt.Errorf("Wrong Block.Header.ChainID. Expected %v, got %v", s.ChainID, b.ChainID)
}
if b.Height != s.LastBlockHeight+1 {
return fmt.Errorf("Wrong Block.Header.Height. Expected %v, got %v", s.LastBlockHeight+1, b.Height)
}
/* TODO: Determine bounds for Time
See blockchain/reactor "stopSyncingDurationMinutes"
if !b.Time.After(lastBlockTime) {
return errors.New("Invalid Block.Header.Time")
}
*/
// validate prev block info
if !b.LastBlockID.Equals(s.LastBlockID) {
return fmt.Errorf("Wrong Block.Header.LastBlockID. Expected %v, got %v", s.LastBlockID, b.LastBlockID)
}
newTxs := int64(len(b.Data.Txs))
if b.TotalTxs != s.LastBlockTotalTx+newTxs {
return fmt.Errorf("Wrong Block.Header.TotalTxs. Expected %v, got %v", s.LastBlockTotalTx+newTxs, b.TotalTxs)
}
// validate app info
if !bytes.Equal(b.AppHash, s.AppHash) {
return fmt.Errorf("Wrong Block.Header.AppHash. Expected %X, got %v", s.AppHash, b.AppHash)
}
if !bytes.Equal(b.ConsensusHash, s.ConsensusParams.Hash()) {
return fmt.Errorf("Wrong Block.Header.ConsensusHash. Expected %X, got %v", s.ConsensusParams.Hash(), b.ConsensusHash)
}
if !bytes.Equal(b.LastResultsHash, s.LastResultsHash) {
return fmt.Errorf("Wrong Block.Header.LastResultsHash. Expected %X, got %v", s.LastResultsHash, b.LastResultsHash)
}
if !bytes.Equal(b.ValidatorsHash, s.Validators.Hash()) {
return fmt.Errorf("Wrong Block.Header.ValidatorsHash. Expected %X, got %v", s.Validators.Hash(), b.ValidatorsHash)
}
// Validate block LastCommit.
if b.Height == 1 {
if len(b.LastCommit.Precommits) != 0 {
return errors.New("Block at height 1 (first block) should have no LastCommit precommits")
}
} else {
if len(b.LastCommit.Precommits) != s.LastValidators.Size() {
return fmt.Errorf("Invalid block commit size. Expected %v, got %v",
s.LastValidators.Size(), len(b.LastCommit.Precommits))
}
err := s.LastValidators.VerifyCommit(
s.ChainID, s.LastBlockID, b.Height-1, b.LastCommit)
if err != nil {
return err
}
}
for _, ev := range b.Evidence.Evidence {
if err := VerifyEvidence(stateDB, s, ev); err != nil {
return types.NewEvidenceInvalidErr(ev, err)
}
}
return nil
}
// XXX: What's cheaper (ie. what should be checked first):
// evidence internal validity (ie. sig checks) or validator existed (fetch historical val set from db)
// VerifyEvidence verifies the evidence fully by checking it is internally
// consistent and sufficiently recent.
func VerifyEvidence(stateDB dbm.DB, s State, evidence types.Evidence) error {
height := s.LastBlockHeight
evidenceAge := height - evidence.Height()
maxAge := s.ConsensusParams.EvidenceParams.MaxAge
if evidenceAge > maxAge {
return fmt.Errorf("Evidence from height %d is too old. Min height is %d",
evidence.Height(), height-maxAge)
}
if err := evidence.Verify(s.ChainID); err != nil {
return err
}
valset, err := LoadValidators(stateDB, evidence.Height())
if err != nil {
// TODO: if err is just that we cant find it cuz we pruned, ignore.
// TODO: if its actually bad evidence, punish peer
return err
}
// The address must have been an active validator at the height
ev := evidence
height, addr, idx := ev.Height(), ev.Address(), ev.Index()
valIdx, val := valset.GetByAddress(addr)
if val == nil {
return fmt.Errorf("Address %X was not a validator at height %d", addr, height)
} else if idx != valIdx {
return fmt.Errorf("Address %X was validator %d at height %d, not %d", addr, valIdx, height, idx)
}
return nil
}

68
state/validation_test.go Normal file
View File

@ -0,0 +1,68 @@
package state
import (
"testing"
"github.com/stretchr/testify/require"
dbm "github.com/tendermint/tmlibs/db"
"github.com/tendermint/tmlibs/log"
)
func TestValidateBlock(t *testing.T) {
state := state()
blockExec := NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nil, nil, nil)
// proper block must pass
block := makeBlock(state, 1)
err := blockExec.ValidateBlock(state, block)
require.NoError(t, err)
// wrong chain fails
block = makeBlock(state, 1)
block.ChainID = "not-the-real-one"
err = blockExec.ValidateBlock(state, block)
require.Error(t, err)
// wrong height fails
block = makeBlock(state, 1)
block.Height += 10
err = blockExec.ValidateBlock(state, block)
require.Error(t, err)
// wrong total tx fails
block = makeBlock(state, 1)
block.TotalTxs += 10
err = blockExec.ValidateBlock(state, block)
require.Error(t, err)
// wrong blockid fails
block = makeBlock(state, 1)
block.LastBlockID.PartsHeader.Total += 10
err = blockExec.ValidateBlock(state, block)
require.Error(t, err)
// wrong app hash fails
block = makeBlock(state, 1)
block.AppHash = []byte("wrong app hash")
err = blockExec.ValidateBlock(state, block)
require.Error(t, err)
// wrong consensus hash fails
block = makeBlock(state, 1)
block.ConsensusHash = []byte("wrong consensus hash")
err = blockExec.ValidateBlock(state, block)
require.Error(t, err)
// wrong results hash fails
block = makeBlock(state, 1)
block.LastResultsHash = []byte("wrong results hash")
err = blockExec.ValidateBlock(state, block)
require.Error(t, err)
// wrong validators hash fails
block = makeBlock(state, 1)
block.ValidatorsHash = []byte("wrong validators hash")
err = blockExec.ValidateBlock(state, block)
require.Error(t, err)
}

View File

@ -175,6 +175,13 @@ func QueryForEvent(eventType string) tmpubsub.Query {
return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType)) return tmquery.MustParse(fmt.Sprintf("%s='%s'", EventTypeKey, eventType))
} }
// BlockEventPublisher publishes all block related events
type BlockEventPublisher interface {
PublishEventNewBlock(block EventDataNewBlock) error
PublishEventNewBlockHeader(header EventDataNewBlockHeader) error
PublishEventTx(EventDataTx) error
}
type TxEventPublisher interface { type TxEventPublisher interface {
PublishEventTx(EventDataTx) error PublishEventTx(EventDataTx) error
} }

View File

@ -70,15 +70,6 @@ type BlockStore interface {
SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit) SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit)
} }
//------------------------------------------------------
// state
// State defines the stateful interface used to verify evidence.
// UNSTABLE
type State interface {
VerifyEvidence(Evidence) (priority int64, err error)
}
//------------------------------------------------------ //------------------------------------------------------
// evidence pool // evidence pool
@ -87,7 +78,7 @@ type State interface {
type EvidencePool interface { type EvidencePool interface {
PendingEvidence() []Evidence PendingEvidence() []Evidence
AddEvidence(Evidence) error AddEvidence(Evidence) error
MarkEvidenceAsCommitted([]Evidence) Update(*Block)
} }
// MockMempool is an empty implementation of a Mempool, useful for testing. // MockMempool is an empty implementation of a Mempool, useful for testing.
@ -95,6 +86,6 @@ type EvidencePool interface {
type MockEvidencePool struct { type MockEvidencePool struct {
} }
func (m MockEvidencePool) PendingEvidence() []Evidence { return nil } func (m MockEvidencePool) PendingEvidence() []Evidence { return nil }
func (m MockEvidencePool) AddEvidence(Evidence) error { return nil } func (m MockEvidencePool) AddEvidence(Evidence) error { return nil }
func (m MockEvidencePool) MarkEvidenceAsCommitted([]Evidence) {} func (m MockEvidencePool) Update(*Block) {}