move types/services.go to state pkg. pass State to evpool.Update

This commit is contained in:
Ethan Buchman
2018-06-04 13:46:34 -07:00
parent edb851280a
commit 3d33226e80
12 changed files with 58 additions and 67 deletions

View File

@ -36,7 +36,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
fastSync := true fastSync := true
var nilApp proxy.AppConnConsensus var nilApp proxy.AppConnConsensus
blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp, blockExec := sm.NewBlockExecutor(dbm.NewMemDB(), log.TestingLogger(), nilApp,
types.MockMempool{}, types.MockEvidencePool{}) sm.MockMempool{}, sm.MockEvidencePool{})
bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync) bcReactor := NewBlockchainReactor(state.Copy(), blockExec, blockStore, fastSync)
bcReactor.SetLogger(logger.With("module", "blockchain")) bcReactor.SetLogger(logger.With("module", "blockchain"))

View File

@ -262,7 +262,7 @@ func newConsensusStateWithConfigAndBlockStore(thisConfig *cfg.Config, state sm.S
} }
// mock the evidence pool // mock the evidence pool
evpool := types.MockEvidencePool{} evpool := sm.MockEvidencePool{}
// Make ConsensusState // Make ConsensusState
stateDB := dbm.NewMemDB() stateDB := dbm.NewMemDB()

View File

@ -196,7 +196,7 @@ func makeHeightSearchFunc(height int64) auto.SearchFunc {
type Handshaker struct { type Handshaker struct {
stateDB dbm.DB stateDB dbm.DB
initialState sm.State initialState sm.State
store types.BlockStore store sm.BlockStore
appState json.RawMessage appState json.RawMessage
logger log.Logger logger log.Logger
@ -204,7 +204,7 @@ type Handshaker struct {
} }
func NewHandshaker(stateDB dbm.DB, state sm.State, func NewHandshaker(stateDB dbm.DB, state sm.State,
store types.BlockStore, appState json.RawMessage) *Handshaker { store sm.BlockStore, appState json.RawMessage) *Handshaker {
return &Handshaker{ return &Handshaker{
stateDB: stateDB, stateDB: stateDB,
@ -390,7 +390,7 @@ func (h *Handshaker) replayBlock(state sm.State, height int64, proxyApp proxy.Ap
block := h.store.LoadBlock(height) block := h.store.LoadBlock(height)
meta := h.store.LoadBlockMeta(height) meta := h.store.LoadBlockMeta(height)
blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, types.MockMempool{}, types.MockEvidencePool{}) blockExec := sm.NewBlockExecutor(h.stateDB, h.logger, proxyApp, sm.MockMempool{}, sm.MockEvidencePool{})
var err error var err error
state, err = blockExec.ApplyBlock(state, meta.BlockID, block) state, err = blockExec.ApplyBlock(state, meta.BlockID, block)

View File

@ -310,7 +310,7 @@ func newConsensusStateForReplay(config cfg.BaseConfig, csConfig *cfg.ConsensusCo
cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err)) cmn.Exit(cmn.Fmt("Failed to start event bus: %v", err))
} }
mempool, evpool := types.MockMempool{}, types.MockEvidencePool{} mempool, evpool := sm.MockMempool{}, sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewConsensusState(csConfig, state.Copy(), blockExec, consensusState := NewConsensusState(csConfig, state.Copy(), blockExec,

View File

@ -263,8 +263,8 @@ const (
) )
var ( var (
mempool = types.MockMempool{} mempool = sm.MockMempool{}
evpool = types.MockEvidencePool{} evpool = sm.MockEvidencePool{}
) )
//--------------------------------------- //---------------------------------------

View File

@ -76,9 +76,9 @@ type ConsensusState struct {
// services for creating and executing blocks // services for creating and executing blocks
// TODO: encapsulate all of this in one "BlockManager" // TODO: encapsulate all of this in one "BlockManager"
blockExec *sm.BlockExecutor blockExec *sm.BlockExecutor
blockStore types.BlockStore blockStore sm.BlockStore
mempool types.Mempool mempool sm.Mempool
evpool types.EvidencePool evpool sm.EvidencePool
// internal state // internal state
mtx sync.Mutex mtx sync.Mutex
@ -118,7 +118,7 @@ type ConsensusState struct {
} }
// NewConsensusState returns a new ConsensusState. // NewConsensusState returns a new ConsensusState.
func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore types.BlockStore, mempool types.Mempool, evpool types.EvidencePool) *ConsensusState { func NewConsensusState(config *cfg.ConsensusConfig, state sm.State, blockExec *sm.BlockExecutor, blockStore sm.BlockStore, mempool sm.Mempool, evpool sm.EvidencePool) *ConsensusState {
cs := &ConsensusState{ cs := &ConsensusState{
config: config, config: config,
blockExec: blockExec, blockExec: blockExec,

View File

@ -65,8 +65,8 @@ func WALWithNBlocks(numBlocks int) (data []byte, err error) {
return nil, errors.Wrap(err, "failed to start event bus") return nil, errors.Wrap(err, "failed to start event bus")
} }
defer eventBus.Stop() defer eventBus.Stop()
mempool := types.MockMempool{} mempool := sm.MockMempool{}
evpool := types.MockEvidencePool{} evpool := sm.MockEvidencePool{}
blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool) blockExec := sm.NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), mempool, evpool)
consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool) consensusState := NewConsensusState(config.Consensus, state.Copy(), blockExec, blockStore, mempool, evpool)
consensusState.SetLogger(logger) consensusState.SetLogger(logger)

View File

@ -68,13 +68,13 @@ func (evpool *EvidencePool) State() sm.State {
} }
// Update loads the latest // Update loads the latest
func (evpool *EvidencePool) Update(block *types.Block) { func (evpool *EvidencePool) Update(block *types.Block, state sm.State) {
evpool.mtx.Lock() evpool.mtx.Lock()
defer evpool.mtx.Unlock() defer evpool.mtx.Unlock()
state := sm.LoadState(evpool.stateDB) // sanity check
if state.LastBlockHeight != block.Height { if state.LastBlockHeight != block.Height {
panic(fmt.Sprintf("EvidencePool.Update: loaded state with height %d when block.Height=%d", state.LastBlockHeight, block.Height)) panic(fmt.Sprintf("Failed EvidencePool.Update sanity check: got state.Height=%d with block.Height=%d", state.LastBlockHeight, block.Height))
} }
evpool.state = state evpool.state = state

View File

@ -51,9 +51,9 @@ var (
// interfaces defined in types and above // interfaces defined in types and above
stateDB dbm.DB stateDB dbm.DB
blockStore types.BlockStore blockStore sm.BlockStore
mempool types.Mempool mempool sm.Mempool
evidencePool types.EvidencePool evidencePool sm.EvidencePool
consensusState Consensus consensusState Consensus
p2pSwitch P2P p2pSwitch P2P
@ -72,15 +72,15 @@ func SetStateDB(db dbm.DB) {
stateDB = db stateDB = db
} }
func SetBlockStore(bs types.BlockStore) { func SetBlockStore(bs sm.BlockStore) {
blockStore = bs blockStore = bs
} }
func SetMempool(mem types.Mempool) { func SetMempool(mem sm.Mempool) {
mempool = mem mempool = mem
} }
func SetEvidencePool(evpool types.EvidencePool) { func SetEvidencePool(evpool sm.EvidencePool) {
evidencePool = evpool evidencePool = evpool
} }

View File

@ -29,8 +29,8 @@ type BlockExecutor struct {
eventBus types.BlockEventPublisher eventBus types.BlockEventPublisher
// update these with block results after commit // update these with block results after commit
mempool types.Mempool mempool Mempool
evpool types.EvidencePool evpool EvidencePool
logger log.Logger logger log.Logger
} }
@ -38,7 +38,7 @@ type BlockExecutor struct {
// NewBlockExecutor returns a new BlockExecutor with a NopEventBus. // NewBlockExecutor returns a new BlockExecutor with a NopEventBus.
// Call SetEventBus to provide one. // Call SetEventBus to provide one.
func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus, func NewBlockExecutor(db dbm.DB, logger log.Logger, proxyApp proxy.AppConnConsensus,
mempool types.Mempool, evpool types.EvidencePool) *BlockExecutor { mempool Mempool, evpool EvidencePool) *BlockExecutor {
return &BlockExecutor{ return &BlockExecutor{
db: db, db: db,
proxyApp: proxyApp, proxyApp: proxyApp,
@ -98,6 +98,9 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
return state, fmt.Errorf("Commit failed for application: %v", err) return state, fmt.Errorf("Commit failed for application: %v", err)
} }
// Update evpool with the block and state.
blockExec.evpool.Update(block, state)
fail.Fail() // XXX fail.Fail() // XXX
// update the app hash and save the state // update the app hash and save the state
@ -106,11 +109,6 @@ func (blockExec *BlockExecutor) ApplyBlock(state State, blockID types.BlockID, b
fail.Fail() // XXX fail.Fail() // XXX
// Update evpool now that state is saved.
// TODO: handle the crash/recover scenario
// ie. (may need to call Update for last block)
blockExec.evpool.Update(block)
// events are fired after everything else // events are fired after everything else
// NOTE: if we crash between Commit and Save, events wont be fired during replay // NOTE: if we crash between Commit and Save, events wont be fired during replay
fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses) fireEvents(blockExec.logger, blockExec.eventBus, block, abciResponses)

View File

@ -34,7 +34,7 @@ func TestApplyBlock(t *testing.T) {
state, stateDB := state(), dbm.NewMemDB() state, stateDB := state(), dbm.NewMemDB()
blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(), blockExec := NewBlockExecutor(stateDB, log.TestingLogger(), proxyApp.Consensus(),
types.MockMempool{}, types.MockEvidencePool{}) sm.MockMempool{}, sm.MockEvidencePool{})
block := makeBlock(state, 1) block := makeBlock(state, 1)
blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()} blockID := types.BlockID{block.Hash(), block.MakePartSet(testPartSize).Header()}

View File

@ -1,11 +1,10 @@
package types package state
import ( import (
abci "github.com/tendermint/abci/types" abci "github.com/tendermint/abci/types"
"github.com/tendermint/tendermint/types"
) )
// NOTE/XXX: all type definitions in this file are considered UNSTABLE
//------------------------------------------------------ //------------------------------------------------------
// blockchain services types // blockchain services types
// NOTE: Interfaces used by RPC must be thread safe! // NOTE: Interfaces used by RPC must be thread safe!
@ -17,15 +16,14 @@ import (
// Mempool defines the mempool interface as used by the ConsensusState. // Mempool defines the mempool interface as used by the ConsensusState.
// Updates to the mempool need to be synchronized with committing a block // Updates to the mempool need to be synchronized with committing a block
// so apps can reset their transient state on Commit // so apps can reset their transient state on Commit
// UNSTABLE
type Mempool interface { type Mempool interface {
Lock() Lock()
Unlock() Unlock()
Size() int Size() int
CheckTx(Tx, func(*abci.Response)) error CheckTx(types.Tx, func(*abci.Response)) error
Reap(int) Txs Reap(int) types.Txs
Update(height int64, txs Txs) error Update(height int64, txs types.Txs) error
Flush() Flush()
FlushAppConn() error FlushAppConn() error
@ -34,16 +32,15 @@ type Mempool interface {
} }
// MockMempool is an empty implementation of a Mempool, useful for testing. // MockMempool is an empty implementation of a Mempool, useful for testing.
// UNSTABLE
type MockMempool struct { type MockMempool struct {
} }
func (m MockMempool) Lock() {} func (m MockMempool) Lock() {}
func (m MockMempool) Unlock() {} func (m MockMempool) Unlock() {}
func (m MockMempool) Size() int { return 0 } func (m MockMempool) Size() int { return 0 }
func (m MockMempool) CheckTx(tx Tx, cb func(*abci.Response)) error { return nil } func (m MockMempool) CheckTx(tx types.Tx, cb func(*abci.Response)) error { return nil }
func (m MockMempool) Reap(n int) Txs { return Txs{} } func (m MockMempool) Reap(n int) types.Txs { return types.Txs{} }
func (m MockMempool) Update(height int64, txs Txs) error { return nil } func (m MockMempool) Update(height int64, txs types.Txs) error { return nil }
func (m MockMempool) Flush() {} func (m MockMempool) Flush() {}
func (m MockMempool) FlushAppConn() error { return nil } func (m MockMempool) FlushAppConn() error { return nil }
func (m MockMempool) TxsAvailable() <-chan int64 { return make(chan int64) } func (m MockMempool) TxsAvailable() <-chan int64 { return make(chan int64) }
@ -53,41 +50,37 @@ func (m MockMempool) EnableTxsAvailable() {}
// blockstore // blockstore
// BlockStoreRPC is the block store interface used by the RPC. // BlockStoreRPC is the block store interface used by the RPC.
// UNSTABLE
type BlockStoreRPC interface { type BlockStoreRPC interface {
Height() int64 Height() int64
LoadBlockMeta(height int64) *BlockMeta LoadBlockMeta(height int64) *types.BlockMeta
LoadBlock(height int64) *Block LoadBlock(height int64) *types.Block
LoadBlockPart(height int64, index int) *Part LoadBlockPart(height int64, index int) *types.Part
LoadBlockCommit(height int64) *Commit LoadBlockCommit(height int64) *types.Commit
LoadSeenCommit(height int64) *Commit LoadSeenCommit(height int64) *types.Commit
} }
// BlockStore defines the BlockStore interface used by the ConsensusState. // BlockStore defines the BlockStore interface used by the ConsensusState.
// UNSTABLE
type BlockStore interface { type BlockStore interface {
BlockStoreRPC BlockStoreRPC
SaveBlock(block *Block, blockParts *PartSet, seenCommit *Commit) SaveBlock(block *types.Block, blockParts *types.PartSet, seenCommit *types.Commit)
} }
//------------------------------------------------------ //-----------------------------------------------------------------------------------------------------
// evidence pool // evidence pool
// EvidencePool defines the EvidencePool interface used by the ConsensusState. // EvidencePool defines the EvidencePool interface used by the ConsensusState.
// UNSTABLE
type EvidencePool interface { type EvidencePool interface {
PendingEvidence() []Evidence PendingEvidence() []types.Evidence
AddEvidence(Evidence) error AddEvidence(types.Evidence) error
Update(*Block) Update(*types.Block, State)
} }
// MockMempool is an empty implementation of a Mempool, useful for testing. // MockMempool is an empty implementation of a Mempool, useful for testing.
// UNSTABLE
type MockEvidencePool struct { type MockEvidencePool struct {
} }
func (m MockEvidencePool) PendingEvidence() []Evidence { return nil } func (m MockEvidencePool) PendingEvidence() []types.Evidence { return nil }
func (m MockEvidencePool) AddEvidence(Evidence) error { return nil } func (m MockEvidencePool) AddEvidence(types.Evidence) error { return nil }
func (m MockEvidencePool) Update(*Block) {} func (m MockEvidencePool) Update(*types.Block, State) {}