handle data corruption errors

Refs #573
This commit is contained in:
Anton Kaliaev 2017-12-11 19:48:20 -06:00
parent 5c58db3bb4
commit 40f9261d48
No known key found for this signature in database
GPG Key ID: 7B6881D965918214
5 changed files with 54 additions and 21 deletions

View File

@ -2,7 +2,6 @@ package consensus
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"hash/crc32" "hash/crc32"
"io" "io"
@ -95,10 +94,13 @@ func (cs *ConsensusState) catchupReplay(csHeight int64) error {
cs.replayMode = true cs.replayMode = true
defer func() { cs.replayMode = false }() defer func() { cs.replayMode = false }()
// Ensure that ENDHEIGHT for this height doesn't exist // Ensure that ENDHEIGHT for this height doesn't exist.
// NOTE: This is just a sanity check. As far as we know things work fine without it, // NOTE: This is just a sanity check. As far as we know things work fine
// and Handshake could reuse ConsensusState if it weren't for this check (since we can crash after writing ENDHEIGHT). // without it, and Handshake could reuse ConsensusState if it weren't for
gr, found, err := cs.wal.SearchForEndHeight(csHeight) // this check (since we can crash after writing ENDHEIGHT).
//
// Ignore data corruption errors since this is a sanity check.
gr, found, err := cs.wal.SearchForEndHeight(csHeight, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err != nil { if err != nil {
return err return err
} }
@ -112,14 +114,16 @@ func (cs *ConsensusState) catchupReplay(csHeight int64) error {
} }
// Search for last height marker // Search for last height marker
gr, found, err = cs.wal.SearchForEndHeight(csHeight - 1) //
// Ignore data corruption errors in previous heights because we only care about last height
gr, found, err = cs.wal.SearchForEndHeight(csHeight-1, &WALSearchOptions{IgnoreDataCorruptionErrors: true})
if err == io.EOF { if err == io.EOF {
cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1) cs.Logger.Error("Replay: wal.group.Search returned EOF", "#ENDHEIGHT", csHeight-1)
} else if err != nil { } else if err != nil {
return err return err
} }
if !found { if !found {
return errors.New(cmn.Fmt("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)) return fmt.Errorf("Cannot replay height %d. WAL does not contain #ENDHEIGHT for %d.", csHeight, csHeight-1)
} }
defer gr.Close() // nolint: errcheck defer gr.Close() // nolint: errcheck
@ -132,9 +136,13 @@ func (cs *ConsensusState) catchupReplay(csHeight int64) error {
msg, err = dec.Decode() msg, err = dec.Decode()
if err == io.EOF { if err == io.EOF {
break break
} else if IsDataCorruptionError(err) {
cs.Logger.Debug("data has been corrupted in last height of consensus WAL", "err", err, "height", csHeight)
cmn.PanicCrisis(fmt.Sprintf("data has been corrupted (%v) in last height %d of consensus WAL", err, csHeight))
} else if err != nil { } else if err != nil {
return err return err
} }
// NOTE: since the priv key is set when the msgs are received // NOTE: since the priv key is set when the msgs are received
// it will attempt to eg double sign but we can just ignore it // it will attempt to eg double sign but we can just ignore it
// since the votes will be replayed and we'll get to the next step // since the votes will be replayed and we'll get to the next step
@ -202,7 +210,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// handshake is done via info request on the query conn // handshake is done via info request on the query conn
res, err := proxyApp.Query().InfoSync(abci.RequestInfo{version.Version}) res, err := proxyApp.Query().InfoSync(abci.RequestInfo{version.Version})
if err != nil { if err != nil {
return errors.New(cmn.Fmt("Error calling Info: %v", err)) return fmt.Errorf("Error calling Info: %v", err)
} }
blockHeight := int64(res.LastBlockHeight) blockHeight := int64(res.LastBlockHeight)
@ -218,7 +226,7 @@ func (h *Handshaker) Handshake(proxyApp proxy.AppConns) error {
// replay blocks up to the latest in the blockstore // replay blocks up to the latest in the blockstore
_, err = h.ReplayBlocks(appHash, blockHeight, proxyApp) _, err = h.ReplayBlocks(appHash, blockHeight, proxyApp)
if err != nil { if err != nil {
return errors.New(cmn.Fmt("Error on replay: %v", err)) return fmt.Errorf("Error on replay: %v", err)
} }
h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash)) h.logger.Info("Completed ABCI Handshake - Tendermint and App are synced", "appHeight", blockHeight, "appHash", fmt.Sprintf("%X", appHash))
@ -358,7 +366,7 @@ func (h *Handshaker) replayBlock(height int64, proxyApp proxy.AppConnConsensus)
func (h *Handshaker) checkAppHash(appHash []byte) error { func (h *Handshaker) checkAppHash(appHash []byte) error {
if !bytes.Equal(h.state.AppHash, appHash) { if !bytes.Equal(h.state.AppHash, appHash) {
panic(errors.New(cmn.Fmt("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash)).Error()) panic(fmt.Errorf("Tendermint state.AppHash does not match AppHash after replay. Got %X, expected %X", appHash, h.state.AppHash).Error())
} }
return nil return nil
} }

View File

@ -253,8 +253,8 @@ func (w *crashingWAL) Save(m WALMessage) {
} }
func (w *crashingWAL) Group() *auto.Group { return w.next.Group() } func (w *crashingWAL) Group() *auto.Group { return w.next.Group() }
func (w *crashingWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) { func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
return w.next.SearchForEndHeight(height) return w.next.SearchForEndHeight(height, options)
} }
func (w *crashingWAL) Start() error { return w.next.Start() } func (w *crashingWAL) Start() error { return w.next.Start() }
@ -485,7 +485,7 @@ func buildTMStateFromChain(config *cfg.Config, state *sm.State, chain []*types.B
func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) { func makeBlockchainFromWAL(wal WAL) ([]*types.Block, []*types.Commit, error) {
// Search for height marker // Search for height marker
gr, found, err := wal.SearchForEndHeight(0) gr, found, err := wal.SearchForEndHeight(0, &WALSearchOptions{})
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }

View File

@ -48,7 +48,7 @@ var _ = wire.RegisterInterface(
type WAL interface { type WAL interface {
Save(WALMessage) Save(WALMessage)
Group() *auto.Group Group() *auto.Group
SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error)
Start() error Start() error
Stop() error Stop() error
@ -133,12 +133,17 @@ func (wal *baseWAL) Save(msg WALMessage) {
} }
} }
// WALSearchOptions are optional arguments to SearchForEndHeight.
type WALSearchOptions struct {
IgnoreDataCorruptionErrors bool
}
// SearchForEndHeight searches for the EndHeightMessage with the height and // SearchForEndHeight searches for the EndHeightMessage with the height and
// returns an auto.GroupReader, whenever it was found or not and an error. // returns an auto.GroupReader, whenever it was found or not and an error.
// Group reader will be nil if found equals false. // Group reader will be nil if found equals false.
// //
// CONTRACT: caller must close group reader. // CONTRACT: caller must close group reader.
func (wal *baseWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) { func (wal *baseWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
var msg *TimedWALMessage var msg *TimedWALMessage
// NOTE: starting from the last file in the group because we're usually // NOTE: starting from the last file in the group because we're usually
@ -158,7 +163,9 @@ func (wal *baseWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, foun
// check next file // check next file
break break
} }
if err != nil { if options.IgnoreDataCorruptionErrors && IsDataCorruptionError(err) {
// do nothing
} else if err != nil {
gr.Close() gr.Close()
return nil, false, err return nil, false, err
} }
@ -210,6 +217,24 @@ func (enc *WALEncoder) Encode(v *TimedWALMessage) error {
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
// IsDataCorruptionError returns true if data has been corrupted inside WAL.
func IsDataCorruptionError(err error) bool {
_, ok := err.(DataCorruptionError)
return ok
}
type DataCorruptionError struct {
cause error
}
func (e DataCorruptionError) Error() string {
return fmt.Sprintf("DataCorruptionError[%v]", e.cause)
}
func (e DataCorruptionError) Cause() error {
return e.cause
}
// A WALDecoder reads and decodes custom-encoded WAL messages from an input // A WALDecoder reads and decodes custom-encoded WAL messages from an input
// stream. See WALEncoder for the format used. // stream. See WALEncoder for the format used.
// //
@ -259,14 +284,14 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
// check checksum before decoding data // check checksum before decoding data
actualCRC := crc32.Checksum(data, crc32c) actualCRC := crc32.Checksum(data, crc32c)
if actualCRC != crc { if actualCRC != crc {
return nil, fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC) return nil, DataCorruptionError{fmt.Errorf("checksums do not match: (read: %v, actual: %v)", crc, actualCRC)}
} }
var nn int var nn int
var res *TimedWALMessage // nolint: gosimple var res *TimedWALMessage // nolint: gosimple
res = wire.ReadBinary(&TimedWALMessage{}, bytes.NewBuffer(data), int(length), &nn, &err).(*TimedWALMessage) res = wire.ReadBinary(&TimedWALMessage{}, bytes.NewBuffer(data), int(length), &nn, &err).(*TimedWALMessage)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to decode data: %v", err) return nil, DataCorruptionError{fmt.Errorf("failed to decode data: %v", err)}
} }
return res, err return res, err
@ -276,7 +301,7 @@ type nilWAL struct{}
func (nilWAL) Save(m WALMessage) {} func (nilWAL) Save(m WALMessage) {}
func (nilWAL) Group() *auto.Group { return nil } func (nilWAL) Group() *auto.Group { return nil }
func (nilWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) { func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
return nil, false, nil return nil, false, nil
} }
func (nilWAL) Start() error { return nil } func (nilWAL) Start() error { return nil }

View File

@ -172,7 +172,7 @@ func (w *byteBufferWAL) Save(m WALMessage) {
func (w *byteBufferWAL) Group() *auto.Group { func (w *byteBufferWAL) Group() *auto.Group {
panic("not implemented") panic("not implemented")
} }
func (w *byteBufferWAL) SearchForEndHeight(height int64) (gr *auto.GroupReader, found bool, err error) { func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
return nil, false, nil return nil, false, nil
} }

View File

@ -54,7 +54,7 @@ func TestSearchForEndHeight(t *testing.T) {
} }
h := int64(3) h := int64(3)
gr, found, err := wal.SearchForEndHeight(h) gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{})
assert.NoError(t, err, cmn.Fmt("expected not to err on height %d", h)) assert.NoError(t, err, cmn.Fmt("expected not to err on height %d", h))
assert.True(t, found, cmn.Fmt("expected to find end height for %d", h)) assert.True(t, found, cmn.Fmt("expected to find end height for %d", h))
assert.NotNil(t, gr, "expected group not to be nil") assert.NotNil(t, gr, "expected group not to be nil")