fewer structs. remove viper from consensus

This commit is contained in:
Ethan Buchman
2017-05-01 20:09:29 -04:00
parent d8fb226ec4
commit 75b6c5215f
17 changed files with 371 additions and 299 deletions

View File

@ -10,7 +10,6 @@ import (
"time"
"github.com/ebuchman/fail-test"
"github.com/spf13/viper"
"github.com/tendermint/go-wire"
"github.com/tendermint/tendermint/proxy"
@ -20,53 +19,90 @@ import (
)
//-----------------------------------------------------------------------------
// Timeout Parameters
// Config
// TimeoutParams holds timeouts and deltas for each round step.
// All timeouts and deltas in milliseconds.
type TimeoutParams struct {
Propose0 int
ProposeDelta int
Prevote0 int
PrevoteDelta int
Precommit0 int
PrecommitDelta int
Commit0 int
SkipTimeoutCommit bool
// Config holds timeouts and details about the WAL, the block structure,
// and timeouts in the consensus protocol.
type Config struct {
WalFile string `mapstructure:"wal_file"`
WalLight bool `mapstructure:"wal_light"`
// All timeouts are in ms
TimeoutPropose int `mapstructure:"timeout_propose"`
TimeoutProposeDelta int `mapstructure:"timeout_propose_delta"`
TimeoutPrevote int `mapstructure:"timeout_prevote"`
TimeoutPrevoteDelta int `mapstructure:"timeout_prevote_delta"`
TimeoutPrecommit int `mapstructure:"timeout_precommit"`
TimeoutPrecommitDelta int `mapstructure:"timeout_precommit_delta"`
TimeoutCommit int `mapstructure:"timeout_commit"`
// Make progress as soon as we have all the precommits (as if TimeoutCommit = 0)
SkipTimeoutCommit bool `mapstructure:"skip_timeout_commit"`
// BlockSize
MaxBlockSizeTxs int `mapstructure:"block_size_txs"`
MaxBlockSizeBytes int `mapstructure:"block_size_bytes"`
// TODO: This probably shouldn't be exposed but it makes it
// easy to write tests for the wal/replay
BlockPartSize int `mapstructure:"block_part_size"`
chainID string
}
func NewDefaultConfig(rootDir string) *Config {
return &Config{
WalFile: rootDir + "/data/cs.wal/wal",
WalLight: false,
TimeoutPropose: 3000,
TimeoutProposeDelta: 500,
TimeoutPrevote: 1000,
TimeoutPrevoteDelta: 500,
TimeoutPrecommit: 1000,
TimeoutPrecommitDelta: 500,
TimeoutCommit: 1000,
SkipTimeoutCommit: false,
MaxBlockSizeTxs: 10000,
MaxBlockSizeBytes: 1, // TODO
BlockPartSize: types.DefaultBlockPartSize,
}
}
func NewTestConfig(rootDir string) *Config {
config := NewDefaultConfig(rootDir)
config.TimeoutPropose = 2000
config.TimeoutProposeDelta = 1
config.TimeoutPrevote = 10
config.TimeoutPrevoteDelta = 1
config.TimeoutPrecommit = 10
config.TimeoutPrecommitDelta = 1
config.TimeoutCommit = 10
config.SkipTimeoutCommit = true
return config
}
func (cfg *Config) SetChainID(chainID string) {
cfg.chainID = chainID
}
// Wait this long for a proposal
func (tp *TimeoutParams) Propose(round int) time.Duration {
return time.Duration(tp.Propose0+tp.ProposeDelta*round) * time.Millisecond
func (cfg *Config) Propose(round int) time.Duration {
return time.Duration(cfg.TimeoutPropose+cfg.TimeoutProposeDelta*round) * time.Millisecond
}
// After receiving any +2/3 prevote, wait this long for stragglers
func (tp *TimeoutParams) Prevote(round int) time.Duration {
return time.Duration(tp.Prevote0+tp.PrevoteDelta*round) * time.Millisecond
func (cfg *Config) Prevote(round int) time.Duration {
return time.Duration(cfg.TimeoutPrevote+cfg.TimeoutPrevoteDelta*round) * time.Millisecond
}
// After receiving any +2/3 precommits, wait this long for stragglers
func (tp *TimeoutParams) Precommit(round int) time.Duration {
return time.Duration(tp.Precommit0+tp.PrecommitDelta*round) * time.Millisecond
func (cfg *Config) Precommit(round int) time.Duration {
return time.Duration(cfg.TimeoutPrecommit+cfg.TimeoutPrecommitDelta*round) * time.Millisecond
}
// After receiving +2/3 precommits for a single block (a commit), wait this long for stragglers in the next height's RoundStepNewHeight
func (tp *TimeoutParams) Commit(t time.Time) time.Time {
return t.Add(time.Duration(tp.Commit0) * time.Millisecond)
}
// InitTimeoutParamsFromConfig initializes parameters from config
func InitTimeoutParamsFromConfig(config *viper.Viper) *TimeoutParams {
return &TimeoutParams{
Propose0: config.GetInt("timeout_propose"),
ProposeDelta: config.GetInt("timeout_propose_delta"),
Prevote0: config.GetInt("timeout_prevote"),
PrevoteDelta: config.GetInt("timeout_prevote_delta"),
Precommit0: config.GetInt("timeout_precommit"),
PrecommitDelta: config.GetInt("timeout_precommit_delta"),
Commit0: config.GetInt("timeout_commit"),
SkipTimeoutCommit: config.GetBool("skip_timeout_commit"),
}
func (cfg *Config) Commit(t time.Time) time.Time {
return t.Add(time.Duration(cfg.TimeoutCommit) * time.Millisecond)
}
//-----------------------------------------------------------------------------
@ -224,38 +260,48 @@ type PrivValidator interface {
type ConsensusState struct {
cmn.BaseService
config *viper.Viper
// config details
config *Config
privValidator PrivValidator // for signing votes
// services for creating and executing blocks
proxyAppConn proxy.AppConnConsensus
blockStore types.BlockStore
mempool types.Mempool
privValidator PrivValidator // for signing votes
// internal state
mtx sync.Mutex
RoundState
state *sm.State // State until height-1.
peerMsgQueue chan msgInfo // serializes msgs affecting state (proposals, block parts, votes)
internalMsgQueue chan msgInfo // like peerMsgQueue but for our own proposals, parts, votes
timeoutTicker TimeoutTicker // ticker for timeouts
timeoutParams *TimeoutParams // parameters and functions for timeout intervals
// state changes may be triggered by msgs from peers,
// msgs from ourself, or by timeouts
peerMsgQueue chan msgInfo
internalMsgQueue chan msgInfo
timeoutTicker TimeoutTicker
// we use PubSub to trigger msg broadcasts in the reactor,
// and to notify external subscribers, eg. through a websocket
evsw types.EventSwitch
// a Write-Ahead Log ensures we can recover from any kind of crash
// and helps us avoid signing conflicting votes
wal *WAL
replayMode bool // so we don't log signing errors during replay
nSteps int // used for testing to limit the number of transitions the state makes
// for tests where we want to limit the number of transitions the state makes
nSteps int
// allow certain function to be overwritten for testing
// some functions can be overwritten for testing
decideProposal func(height, round int)
doPrevote func(height, round int)
setProposal func(proposal *types.Proposal) error
// closed when we finish shutting down
done chan struct{}
}
func NewConsensusState(config *viper.Viper, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool) *ConsensusState {
func NewConsensusState(config *Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore types.BlockStore, mempool types.Mempool) *ConsensusState {
cs := &ConsensusState{
config: config,
proxyAppConn: proxyAppConn,
@ -264,7 +310,6 @@ func NewConsensusState(config *viper.Viper, state *sm.State, proxyAppConn proxy.
peerMsgQueue: make(chan msgInfo, msgQueueSize),
internalMsgQueue: make(chan msgInfo, msgQueueSize),
timeoutTicker: NewTimeoutTicker(),
timeoutParams: InitTimeoutParamsFromConfig(config),
done: make(chan struct{}),
}
// set function defaults (may be overwritten before calling Start)
@ -341,7 +386,7 @@ func (cs *ConsensusState) LoadCommit(height int) *types.Commit {
func (cs *ConsensusState) OnStart() error {
walFile := cs.config.GetString("cs_wal_file")
walFile := cs.config.WalFile
if err := cs.OpenWAL(walFile); err != nil {
log.Error("Error loading ConsensusState wal", "error", err.Error())
return err
@ -406,7 +451,7 @@ func (cs *ConsensusState) OpenWAL(walFile string) (err error) {
cs.mtx.Lock()
defer cs.mtx.Unlock()
wal, err := NewWAL(walFile, cs.config.GetBool("cs_wal_light"))
wal, err := NewWAL(walFile, cs.config.WalLight)
if err != nil {
return err
}
@ -512,7 +557,7 @@ func (cs *ConsensusState) reconstructLastCommit(state *sm.State) {
return
}
seenCommit := cs.blockStore.LoadSeenCommit(state.LastBlockHeight)
lastPrecommits := types.NewVoteSet(cs.config.GetString("chain_id"), state.LastBlockHeight, seenCommit.Round(), types.VoteTypePrecommit, state.LastValidators)
lastPrecommits := types.NewVoteSet(cs.config.chainID, state.LastBlockHeight, seenCommit.Round(), types.VoteTypePrecommit, state.LastValidators)
for _, precommit := range seenCommit.Precommits {
if precommit == nil {
continue
@ -572,9 +617,9 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
// to be gathered for the first block.
// And alternative solution that relies on clocks:
// cs.StartTime = state.LastBlockTime.Add(timeoutCommit)
cs.StartTime = cs.timeoutParams.Commit(time.Now())
cs.StartTime = cs.config.Commit(time.Now())
} else {
cs.StartTime = cs.timeoutParams.Commit(cs.CommitTime)
cs.StartTime = cs.config.Commit(cs.CommitTime)
}
cs.Validators = validators
cs.Proposal = nil
@ -583,7 +628,7 @@ func (cs *ConsensusState) updateToState(state *sm.State) {
cs.LockedRound = 0
cs.LockedBlock = nil
cs.LockedBlockParts = nil
cs.Votes = NewHeightVoteSet(cs.config.GetString("chain_id"), height, validators)
cs.Votes = NewHeightVoteSet(cs.config.chainID, height, validators)
cs.CommitRound = -1
cs.LastCommit = lastPrecommits
cs.LastValidators = state.LastValidators
@ -799,7 +844,7 @@ func (cs *ConsensusState) enterPropose(height int, round int) {
}()
// If we don't get the proposal and all block parts quick enough, enterPrevote
cs.scheduleTimeout(cs.timeoutParams.Propose(round), height, round, RoundStepPropose)
cs.scheduleTimeout(cs.config.Propose(round), height, round, RoundStepPropose)
// Nothing more to do if we're not a validator
if cs.privValidator == nil {
@ -893,10 +938,10 @@ func (cs *ConsensusState) createProposalBlock() (block *types.Block, blockParts
}
// Mempool validated transactions
txs := cs.mempool.Reap(cs.config.GetInt("block_size"))
txs := cs.mempool.Reap(cs.config.MaxBlockSizeTxs)
return types.MakeBlock(cs.Height, cs.state.ChainID, txs, commit,
cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash, cs.config.GetInt("block_part_size"))
cs.state.LastBlockID, cs.state.Validators.Hash(), cs.state.AppHash, cs.config.BlockPartSize)
}
// Enter: `timeoutPropose` after entering Propose.
@ -982,7 +1027,7 @@ func (cs *ConsensusState) enterPrevoteWait(height int, round int) {
}()
// Wait for some more prevotes; enterPrecommit
cs.scheduleTimeout(cs.timeoutParams.Prevote(round), height, round, RoundStepPrevoteWait)
cs.scheduleTimeout(cs.config.Prevote(round), height, round, RoundStepPrevoteWait)
}
// Enter: +2/3 precomits for block or nil.
@ -1102,7 +1147,7 @@ func (cs *ConsensusState) enterPrecommitWait(height int, round int) {
}()
// Wait for some more precommits; enterNewRound
cs.scheduleTimeout(cs.timeoutParams.Precommit(round), height, round, RoundStepPrecommitWait)
cs.scheduleTimeout(cs.config.Precommit(round), height, round, RoundStepPrecommitWait)
}
@ -1397,7 +1442,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool,
types.FireEventVote(cs.evsw, types.EventDataVote{vote})
// if we can skip timeoutCommit and have all the votes now,
if cs.timeoutParams.SkipTimeoutCommit && cs.LastCommit.HasAll() {
if cs.config.SkipTimeoutCommit && cs.LastCommit.HasAll() {
// go straight to new round (skip timeout commit)
// cs.scheduleTimeout(time.Duration(0), cs.Height, 0, RoundStepNewHeight)
cs.enterNewRound(cs.Height, 0)
@ -1460,7 +1505,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerKey string) (added bool,
cs.enterPrecommit(height, vote.Round)
cs.enterCommit(height, vote.Round)
if cs.timeoutParams.SkipTimeoutCommit && precommits.HasAll() {
if cs.config.SkipTimeoutCommit && precommits.HasAll() {
// if we have all the votes now,
// go straight to new round (skip timeout commit)
// cs.scheduleTimeout(time.Duration(0), cs.Height, 0, RoundStepNewHeight)