mirror of
https://github.com/fluencelabs/tendermint
synced 2025-06-16 14:41:21 +00:00
Merge pull request #438 from tendermint/replay-fix
consensus: timeout on replayLastBlock
This commit is contained in:
@ -63,7 +63,6 @@ func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- s
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pool *BlockPool) OnStart() error {
|
func (pool *BlockPool) OnStart() error {
|
||||||
pool.BaseService.OnStart()
|
|
||||||
go pool.makeRequestersRoutine()
|
go pool.makeRequestersRoutine()
|
||||||
pool.startTime = time.Now()
|
pool.startTime = time.Now()
|
||||||
return nil
|
return nil
|
||||||
@ -409,7 +408,6 @@ func newBPRequester(pool *BlockPool, height int) *bpRequester {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bpr *bpRequester) OnStart() error {
|
func (bpr *bpRequester) OnStart() error {
|
||||||
bpr.BaseService.OnStart()
|
|
||||||
go bpr.requestRoutine()
|
go bpr.requestRoutine()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -83,6 +83,9 @@ func GetConfig(rootDir string) cfg.Config {
|
|||||||
mapConfig.SetDefault("block_size", 10000) // max number of txs
|
mapConfig.SetDefault("block_size", 10000) // max number of txs
|
||||||
mapConfig.SetDefault("block_part_size", 65536) // part size 64K
|
mapConfig.SetDefault("block_part_size", 65536) // part size 64K
|
||||||
mapConfig.SetDefault("disable_data_hash", false)
|
mapConfig.SetDefault("disable_data_hash", false)
|
||||||
|
|
||||||
|
// all timeouts are in ms
|
||||||
|
mapConfig.SetDefault("timeout_handshake", 10000)
|
||||||
mapConfig.SetDefault("timeout_propose", 3000)
|
mapConfig.SetDefault("timeout_propose", 3000)
|
||||||
mapConfig.SetDefault("timeout_propose_delta", 500)
|
mapConfig.SetDefault("timeout_propose_delta", 500)
|
||||||
mapConfig.SetDefault("timeout_prevote", 1000)
|
mapConfig.SetDefault("timeout_prevote", 1000)
|
||||||
@ -90,6 +93,7 @@ func GetConfig(rootDir string) cfg.Config {
|
|||||||
mapConfig.SetDefault("timeout_precommit", 1000)
|
mapConfig.SetDefault("timeout_precommit", 1000)
|
||||||
mapConfig.SetDefault("timeout_precommit_delta", 500)
|
mapConfig.SetDefault("timeout_precommit_delta", 500)
|
||||||
mapConfig.SetDefault("timeout_commit", 1000)
|
mapConfig.SetDefault("timeout_commit", 1000)
|
||||||
|
|
||||||
// make progress asap (no `timeout_commit`) on full precommit votes
|
// make progress asap (no `timeout_commit`) on full precommit votes
|
||||||
mapConfig.SetDefault("skip_timeout_commit", false)
|
mapConfig.SetDefault("skip_timeout_commit", false)
|
||||||
mapConfig.SetDefault("mempool_recheck", true)
|
mapConfig.SetDefault("mempool_recheck", true)
|
||||||
|
@ -93,6 +93,7 @@ func ResetConfig(localPath string) cfg.Config {
|
|||||||
mapConfig.SetDefault("block_size", 10000)
|
mapConfig.SetDefault("block_size", 10000)
|
||||||
mapConfig.SetDefault("block_part_size", 65536) // part size 64K
|
mapConfig.SetDefault("block_part_size", 65536) // part size 64K
|
||||||
mapConfig.SetDefault("disable_data_hash", false)
|
mapConfig.SetDefault("disable_data_hash", false)
|
||||||
|
mapConfig.SetDefault("timeout_handshake", 10000)
|
||||||
mapConfig.SetDefault("timeout_propose", 2000)
|
mapConfig.SetDefault("timeout_propose", 2000)
|
||||||
mapConfig.SetDefault("timeout_propose_delta", 1)
|
mapConfig.SetDefault("timeout_propose_delta", 1)
|
||||||
mapConfig.SetDefault("timeout_prevote", 10)
|
mapConfig.SetDefault("timeout_prevote", 10)
|
||||||
|
@ -190,6 +190,8 @@ func (h *Handshaker) NBlocks() int {
|
|||||||
return h.nBlocks
|
return h.nBlocks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrReplayLastBlockTimeout = errors.New("Timed out waiting for last block to be replayed")
|
||||||
|
|
||||||
// TODO: retry the handshake/replay if it fails ?
|
// TODO: retry the handshake/replay if it fails ?
|
||||||
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
||||||
// handshake is done via info request on the query conn
|
// handshake is done via info request on the query conn
|
||||||
@ -207,7 +209,11 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
|
|||||||
|
|
||||||
// replay blocks up to the latest in the blockstore
|
// replay blocks up to the latest in the blockstore
|
||||||
_, err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
|
_, err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
|
||||||
if err != nil {
|
if err == ErrReplayLastBlockTimeout {
|
||||||
|
log.Warn("Failed to sync via handshake. Trying other means. If they fail, please increase the timeout_handshake parameter")
|
||||||
|
return nil
|
||||||
|
|
||||||
|
} else if err != nil {
|
||||||
return errors.New(Fmt("Error on replay: %v", err))
|
return errors.New(Fmt("Error on replay: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,10 +292,11 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p
|
|||||||
|
|
||||||
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) {
|
func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) {
|
||||||
// App is further behind than it should be, so we need to replay blocks.
|
// App is further behind than it should be, so we need to replay blocks.
|
||||||
// We replay all blocks from appBlockHeight+1 to storeBlockHeight-1,
|
// We replay all blocks from appBlockHeight+1.
|
||||||
// and let the final block be replayed through ReplayBlocks.
|
// If useReplayFunc == true, stop short of the last block
|
||||||
|
// so it can be replayed using the WAL in ReplayBlocks.
|
||||||
// Note that we don't have an old version of the state,
|
// Note that we don't have an old version of the state,
|
||||||
// so we by-pass state validation using applyBlock here.
|
// so we by-pass state validation using sm.ApplyBlock.
|
||||||
|
|
||||||
var appHash []byte
|
var appHash []byte
|
||||||
var err error
|
var err error
|
||||||
@ -328,9 +335,20 @@ func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, e
|
|||||||
newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1)
|
newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1)
|
||||||
|
|
||||||
// run through the WAL, commit new block, stop
|
// run through the WAL, commit new block, stop
|
||||||
cs.Start()
|
if _, err := cs.Start(); err != nil {
|
||||||
<-newBlockCh // TODO: use a timeout and return err?
|
return nil, err
|
||||||
cs.Stop()
|
}
|
||||||
|
defer cs.Stop()
|
||||||
|
|
||||||
|
timeout := h.config.GetInt("timeout_handshake")
|
||||||
|
timer := time.NewTimer(time.Duration(timeout) * time.Millisecond)
|
||||||
|
log.Notice("Attempting to replay last block", "height", h.store.Height(), "timeout", timeout)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-newBlockCh:
|
||||||
|
case <-timer.C:
|
||||||
|
return nil, ErrReplayLastBlockTimeout
|
||||||
|
}
|
||||||
|
|
||||||
h.nBlocks += 1
|
h.nBlocks += 1
|
||||||
|
|
||||||
|
@ -127,8 +127,6 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ConsensusState) startForReplay() {
|
func (cs *ConsensusState) startForReplay() {
|
||||||
// don't want to start full cs
|
|
||||||
cs.BaseService.OnStart()
|
|
||||||
|
|
||||||
log.Warn("Replay commands are disabled until someone updates them and writes tests")
|
log.Warn("Replay commands are disabled until someone updates them and writes tests")
|
||||||
/* TODO:!
|
/* TODO:!
|
||||||
|
@ -340,16 +340,9 @@ func (cs *ConsensusState) LoadCommit(height int) *types.Commit {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cs *ConsensusState) OnStart() error {
|
func (cs *ConsensusState) OnStart() error {
|
||||||
cs.BaseService.OnStart()
|
|
||||||
|
|
||||||
walFile := cs.config.GetString("cs_wal_file")
|
walFile := cs.config.GetString("cs_wal_file")
|
||||||
err := EnsureDir(path.Dir(walFile), 0700)
|
if err := cs.OpenWAL(walFile); err != nil {
|
||||||
if err != nil {
|
|
||||||
log.Error("Error ensuring ConsensusState wal dir", "error", err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = cs.OpenWAL(walFile)
|
|
||||||
if err != nil {
|
|
||||||
log.Error("Error loading ConsensusState wal", "error", err.Error())
|
log.Error("Error loading ConsensusState wal", "error", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -364,8 +357,9 @@ func (cs *ConsensusState) OnStart() error {
|
|||||||
// we may have lost some votes if the process crashed
|
// we may have lost some votes if the process crashed
|
||||||
// reload from consensus log to catchup
|
// reload from consensus log to catchup
|
||||||
if err := cs.catchupReplay(cs.Height); err != nil {
|
if err := cs.catchupReplay(cs.Height); err != nil {
|
||||||
log.Error("Error on catchup replay", "error", err.Error())
|
log.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "error", err.Error())
|
||||||
// let's go for it anyways, maybe we're fine
|
// NOTE: if we ever do return an error here,
|
||||||
|
// make sure to stop the timeoutTicker
|
||||||
}
|
}
|
||||||
|
|
||||||
// now start the receiveRoutine
|
// now start the receiveRoutine
|
||||||
@ -404,6 +398,12 @@ func (cs *ConsensusState) Wait() {
|
|||||||
|
|
||||||
// Open file to log all consensus messages and timeouts for deterministic accountability
|
// Open file to log all consensus messages and timeouts for deterministic accountability
|
||||||
func (cs *ConsensusState) OpenWAL(walFile string) (err error) {
|
func (cs *ConsensusState) OpenWAL(walFile string) (err error) {
|
||||||
|
err = EnsureDir(path.Dir(walFile), 0700)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Error ensuring ConsensusState wal dir", "error", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
cs.mtx.Lock()
|
cs.mtx.Lock()
|
||||||
defer cs.mtx.Unlock()
|
defer cs.mtx.Unlock()
|
||||||
wal, err := NewWAL(walFile, cs.config.GetBool("cs_wal_light"))
|
wal, err := NewWAL(walFile, cs.config.GetBool("cs_wal_light"))
|
||||||
|
@ -45,7 +45,6 @@ func NewTimeoutTicker() TimeoutTicker {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *timeoutTicker) OnStart() error {
|
func (t *timeoutTicker) OnStart() error {
|
||||||
t.BaseService.OnStart()
|
|
||||||
|
|
||||||
go t.timeoutRoutine()
|
go t.timeoutRoutine()
|
||||||
|
|
||||||
|
@ -55,7 +55,6 @@ func NewWAL(walFile string, light bool) (*WAL, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (wal *WAL) OnStart() error {
|
func (wal *WAL) OnStart() error {
|
||||||
wal.BaseService.OnStart()
|
|
||||||
size, err := wal.group.Head.Size()
|
size, err := wal.group.Head.Size()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -194,7 +194,6 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) OnStart() error {
|
func (n *Node) OnStart() error {
|
||||||
n.BaseService.OnStart()
|
|
||||||
|
|
||||||
// Create & add listener
|
// Create & add listener
|
||||||
protocol, address := ProtocolAndAddress(n.config.GetString("node_laddr"))
|
protocol, address := ProtocolAndAddress(n.config.GetString("node_laddr"))
|
||||||
|
@ -72,7 +72,6 @@ func (app *multiAppConn) Query() AppConnQuery {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (app *multiAppConn) OnStart() error {
|
func (app *multiAppConn) OnStart() error {
|
||||||
app.BaseService.OnStart()
|
|
||||||
|
|
||||||
// query connection
|
// query connection
|
||||||
querycli, err := app.clientCreator.NewABCIClient()
|
querycli, err := app.clientCreator.NewABCIClient()
|
||||||
|
Reference in New Issue
Block a user