diff --git a/blockchain/pool.go b/blockchain/pool.go index ef673b34..32db956c 100644 --- a/blockchain/pool.go +++ b/blockchain/pool.go @@ -63,7 +63,6 @@ func NewBlockPool(start int, requestsCh chan<- BlockRequest, timeoutsCh chan<- s } func (pool *BlockPool) OnStart() error { - pool.BaseService.OnStart() go pool.makeRequestersRoutine() pool.startTime = time.Now() return nil @@ -409,7 +408,6 @@ func newBPRequester(pool *BlockPool, height int) *bpRequester { } func (bpr *bpRequester) OnStart() error { - bpr.BaseService.OnStart() go bpr.requestRoutine() return nil } diff --git a/config/tendermint/config.go b/config/tendermint/config.go index b28170ce..3edf2df1 100644 --- a/config/tendermint/config.go +++ b/config/tendermint/config.go @@ -83,6 +83,9 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("block_size", 10000) // max number of txs mapConfig.SetDefault("block_part_size", 65536) // part size 64K mapConfig.SetDefault("disable_data_hash", false) + + // all timeouts are in ms + mapConfig.SetDefault("timeout_handshake", 10000) mapConfig.SetDefault("timeout_propose", 3000) mapConfig.SetDefault("timeout_propose_delta", 500) mapConfig.SetDefault("timeout_prevote", 1000) @@ -90,6 +93,7 @@ func GetConfig(rootDir string) cfg.Config { mapConfig.SetDefault("timeout_precommit", 1000) mapConfig.SetDefault("timeout_precommit_delta", 500) mapConfig.SetDefault("timeout_commit", 1000) + // make progress asap (no `timeout_commit`) on full precommit votes mapConfig.SetDefault("skip_timeout_commit", false) mapConfig.SetDefault("mempool_recheck", true) diff --git a/config/tendermint_test/config.go b/config/tendermint_test/config.go index 55e3adb4..26a48335 100644 --- a/config/tendermint_test/config.go +++ b/config/tendermint_test/config.go @@ -93,6 +93,7 @@ func ResetConfig(localPath string) cfg.Config { mapConfig.SetDefault("block_size", 10000) mapConfig.SetDefault("block_part_size", 65536) // part size 64K mapConfig.SetDefault("disable_data_hash", false) + mapConfig.SetDefault("timeout_handshake", 10000) mapConfig.SetDefault("timeout_propose", 2000) mapConfig.SetDefault("timeout_propose_delta", 1) mapConfig.SetDefault("timeout_prevote", 10) diff --git a/consensus/replay.go b/consensus/replay.go index 6c4e65a0..731e7d21 100644 --- a/consensus/replay.go +++ b/consensus/replay.go @@ -190,6 +190,8 @@ func (h *Handshaker) NBlocks() int { return h.nBlocks } +var ErrReplayLastBlockTimeout = errors.New("Timed out waiting for last block to be replayed") + // TODO: retry the handshake/replay if it fails ? func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // handshake is done via info request on the query conn @@ -207,7 +209,11 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error { // replay blocks up to the latest in the blockstore _, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) - if err != nil { + if err == ErrReplayLastBlockTimeout { + log.Warn("Failed to sync via handshake. Trying other means. If they fail, please increase the timeout_handshake parameter") + return nil + + } else if err != nil { return errors.New(Fmt("Error on replay: %v", err)) } @@ -286,10 +292,11 @@ func (h *Handshaker) ReplayBlocks(appHash []byte, appBlockHeight int, proxyApp p func (h *Handshaker) replayBlocks(proxyApp proxy.AppConns, appBlockHeight, storeBlockHeight int, useReplayFunc bool) ([]byte, error) { // App is further behind than it should be, so we need to replay blocks. - // We replay all blocks from appBlockHeight+1 to storeBlockHeight-1, - // and let the final block be replayed through ReplayBlocks. + // We replay all blocks from appBlockHeight+1. + // If useReplayFunc == true, stop short of the last block + // so it can be replayed using the WAL in ReplayBlocks. // Note that we don't have an old version of the state, - // so we by-pass state validation using applyBlock here. + // so we by-pass state validation using sm.ApplyBlock. var appHash []byte var err error @@ -328,9 +335,20 @@ func (h *Handshaker) replayLastBlock(proxyApp proxy.AppConnConsensus) ([]byte, e newBlockCh := subscribeToEvent(evsw, "consensus-replay", types.EventStringNewBlock(), 1) // run through the WAL, commit new block, stop - cs.Start() - <-newBlockCh // TODO: use a timeout and return err? - cs.Stop() + if _, err := cs.Start(); err != nil { + return nil, err + } + defer cs.Stop() + + timeout := h.config.GetInt("timeout_handshake") + timer := time.NewTimer(time.Duration(timeout) * time.Millisecond) + log.Notice("Attempting to replay last block", "height", h.store.Height(), "timeout", timeout) + + select { + case <-newBlockCh: + case <-timer.C: + return nil, ErrReplayLastBlockTimeout + } h.nBlocks += 1 diff --git a/consensus/replay_file.go b/consensus/replay_file.go index 6ff38088..5ad1b945 100644 --- a/consensus/replay_file.go +++ b/consensus/replay_file.go @@ -127,8 +127,6 @@ func (pb *playback) replayReset(count int, newStepCh chan interface{}) error { } func (cs *ConsensusState) startForReplay() { - // don't want to start full cs - cs.BaseService.OnStart() log.Warn("Replay commands are disabled until someone updates them and writes tests") /* TODO:! diff --git a/consensus/state.go b/consensus/state.go index 23eaff74..63616cbd 100644 --- a/consensus/state.go +++ b/consensus/state.go @@ -340,16 +340,9 @@ func (cs *ConsensusState) LoadCommit(height int) *types.Commit { } func (cs *ConsensusState) OnStart() error { - cs.BaseService.OnStart() walFile := cs.config.GetString("cs_wal_file") - err := EnsureDir(path.Dir(walFile), 0700) - if err != nil { - log.Error("Error ensuring ConsensusState wal dir", "error", err.Error()) - return err - } - err = cs.OpenWAL(walFile) - if err != nil { + if err := cs.OpenWAL(walFile); err != nil { log.Error("Error loading ConsensusState wal", "error", err.Error()) return err } @@ -364,8 +357,9 @@ func (cs *ConsensusState) OnStart() error { // we may have lost some votes if the process crashed // reload from consensus log to catchup if err := cs.catchupReplay(cs.Height); err != nil { - log.Error("Error on catchup replay", "error", err.Error()) - // let's go for it anyways, maybe we're fine + log.Error("Error on catchup replay. Proceeding to start ConsensusState anyway", "error", err.Error()) + // NOTE: if we ever do return an error here, + // make sure to stop the timeoutTicker } // now start the receiveRoutine @@ -404,6 +398,12 @@ func (cs *ConsensusState) Wait() { // Open file to log all consensus messages and timeouts for deterministic accountability func (cs *ConsensusState) OpenWAL(walFile string) (err error) { + err = EnsureDir(path.Dir(walFile), 0700) + if err != nil { + log.Error("Error ensuring ConsensusState wal dir", "error", err.Error()) + return err + } + cs.mtx.Lock() defer cs.mtx.Unlock() wal, err := NewWAL(walFile, cs.config.GetBool("cs_wal_light")) diff --git a/consensus/ticker.go b/consensus/ticker.go index 06a8f7d2..b318597d 100644 --- a/consensus/ticker.go +++ b/consensus/ticker.go @@ -45,7 +45,6 @@ func NewTimeoutTicker() TimeoutTicker { } func (t *timeoutTicker) OnStart() error { - t.BaseService.OnStart() go t.timeoutRoutine() diff --git a/consensus/wal.go b/consensus/wal.go index 99035ee2..6d8eb381 100644 --- a/consensus/wal.go +++ b/consensus/wal.go @@ -55,7 +55,6 @@ func NewWAL(walFile string, light bool) (*WAL, error) { } func (wal *WAL) OnStart() error { - wal.BaseService.OnStart() size, err := wal.group.Head.Size() if err != nil { return err diff --git a/node/node.go b/node/node.go index f9c69c28..6815c779 100644 --- a/node/node.go +++ b/node/node.go @@ -194,7 +194,6 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, clientCreato } func (n *Node) OnStart() error { - n.BaseService.OnStart() // Create & add listener protocol, address := ProtocolAndAddress(n.config.GetString("node_laddr")) diff --git a/proxy/multi_app_conn.go b/proxy/multi_app_conn.go index 353ade35..81e01aa2 100644 --- a/proxy/multi_app_conn.go +++ b/proxy/multi_app_conn.go @@ -72,7 +72,6 @@ func (app *multiAppConn) Query() AppConnQuery { } func (app *multiAppConn) OnStart() error { - app.BaseService.OnStart() // query connection querycli, err := app.clientCreator.NewABCIClient()