rename WAL#Flush to WAL#FlushAndSync (#3345)

* rename WAL#Flush to WAL#FlushAndSync

- rename auto#Flush to auto#FlushAndSync
- cleanup WAL interface to not leak implementation details!
  * remove Group()
  * add WALReader interface and return it in SearchForEndHeight()
- add interface assertions

Refs #3337

* replace WALReader with io.ReadCloser
This commit is contained in:
Anton Kaliaev
2019-02-25 09:11:07 +04:00
committed by GitHub
parent 6797d85851
commit ec9bff5234
8 changed files with 76 additions and 62 deletions

View File

@@ -19,7 +19,6 @@ import (
abci "github.com/tendermint/tendermint/abci/types"
cfg "github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto"
auto "github.com/tendermint/tendermint/libs/autofile"
dbm "github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/privval"
@@ -201,6 +200,8 @@ type crashingWAL struct {
lastPanickedForMsgIndex int // last message for which we panicked
}
var _ WAL = &crashingWAL{}
// WALWriteError indicates a WAL crash.
type WALWriteError struct {
msg string
@@ -248,15 +249,15 @@ func (w *crashingWAL) WriteSync(m WALMessage) {
w.Write(m)
}
func (w *crashingWAL) Group() *auto.Group { return w.next.Group() }
func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
func (w *crashingWAL) FlushAndSync() error { return w.next.FlushAndSync() }
func (w *crashingWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return w.next.SearchForEndHeight(height, options)
}
func (w *crashingWAL) Start() error { return w.next.Start() }
func (w *crashingWAL) Stop() error { return w.next.Stop() }
func (w *crashingWAL) Wait() { w.next.Wait() }
func (w *crashingWAL) Flush() error { return w.Group().Flush() }
//------------------------------------------------------------------------------------------
// Handshake Tests

View File

@@ -910,7 +910,7 @@ func (cs *ConsensusState) defaultDecideProposal(height int64, round int) {
}
// Flush the WAL. Otherwise, we may not recompute the same proposal to sign, and the privValidator will refuse to sign anything.
cs.wal.Flush()
cs.wal.FlushAndSync()
// Make proposal
propBlockId := types.BlockID{Hash: block.Hash(), PartsHeader: blockParts.Header()}
@@ -1678,7 +1678,7 @@ func (cs *ConsensusState) addVote(vote *types.Vote, peerID p2p.ID) (added bool,
func (cs *ConsensusState) signVote(type_ types.SignedMsgType, hash []byte, header types.PartSetHeader) (*types.Vote, error) {
// Flush the WAL. Otherwise, we may not recompute the same vote to sign, and the privValidator will refuse to sign anything.
cs.wal.Flush()
cs.wal.FlushAndSync()
addr := cs.privValidator.GetPubKey().Address()
valIndex, _ := cs.Validators.GetByAddress(addr)

View File

@@ -57,10 +57,11 @@ func RegisterWALMessages(cdc *amino.Codec) {
type WAL interface {
Write(WALMessage)
WriteSync(WALMessage)
Group() *auto.Group
SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error)
Flush() error
FlushAndSync() error
SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error)
// service methods
Start() error
Stop() error
Wait()
@@ -82,6 +83,8 @@ type baseWAL struct {
flushInterval time.Duration
}
var _ WAL = &baseWAL{}
// NewWAL returns a new write-ahead logger based on `baseWAL`, which implements
// WAL. It's flushed and synced to disk every 2s and once when stopped.
func NewWAL(walFile string, groupOptions ...func(*auto.Group)) (*baseWAL, error) {
@@ -125,16 +128,19 @@ func (wal *baseWAL) OnStart() error {
wal.WriteSync(EndHeightMessage{0})
}
err = wal.group.Start()
if err != nil {
return err
}
wal.flushTicker = time.NewTicker(wal.flushInterval)
go wal.processFlushTicks()
return err
return nil
}
func (wal *baseWAL) processFlushTicks() {
for {
select {
case <-wal.flushTicker.C:
if err := wal.Flush(); err != nil {
if err := wal.FlushAndSync(); err != nil {
wal.Logger.Error("Periodic WAL flush failed", "err", err)
}
case <-wal.Quit():
@@ -143,9 +149,10 @@ func (wal *baseWAL) processFlushTicks() {
}
}
// Flush will attempt to flush and fsync the underlying group's data to disk.
func (wal *baseWAL) Flush() error {
return wal.group.Flush()
// FlushAndSync flushes and fsync's the underlying group's data to disk.
// See auto#FlushAndSync
func (wal *baseWAL) FlushAndSync() error {
return wal.group.FlushAndSync()
}
// Stop the underlying autofile group.
@@ -153,7 +160,7 @@ func (wal *baseWAL) Flush() error {
// before cleaning up files.
func (wal *baseWAL) OnStop() {
wal.flushTicker.Stop()
wal.Flush()
wal.FlushAndSync()
wal.group.Stop()
wal.group.Close()
}
@@ -187,7 +194,7 @@ func (wal *baseWAL) WriteSync(msg WALMessage) {
}
wal.Write(msg)
if err := wal.Flush(); err != nil {
if err := wal.FlushAndSync(); err != nil {
panic(fmt.Sprintf("Error flushing consensus wal buf to file. Error: %v \n", err))
}
}
@@ -203,8 +210,11 @@ type WALSearchOptions struct {
// Group reader will be nil if found equals false.
//
// CONTRACT: caller must close group reader.
func (wal *baseWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
var msg *TimedWALMessage
func (wal *baseWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
var (
msg *TimedWALMessage
gr *auto.GroupReader
)
lastHeightFound := int64(-1)
// NOTE: starting from the last file in the group because we're usually
@@ -371,13 +381,14 @@ func (dec *WALDecoder) Decode() (*TimedWALMessage, error) {
type nilWAL struct{}
var _ WAL = nilWAL{}
func (nilWAL) Write(m WALMessage) {}
func (nilWAL) WriteSync(m WALMessage) {}
func (nilWAL) Group() *auto.Group { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
func (nilWAL) FlushAndSync() error { return nil }
func (nilWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}
func (nilWAL) Start() error { return nil }
func (nilWAL) Stop() error { return nil }
func (nilWAL) Wait() {}
func (nilWAL) Flush() error { return nil }

View File

@@ -13,7 +13,6 @@ import (
"github.com/tendermint/tendermint/abci/example/kvstore"
bc "github.com/tendermint/tendermint/blockchain"
cfg "github.com/tendermint/tendermint/config"
auto "github.com/tendermint/tendermint/libs/autofile"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/db"
"github.com/tendermint/tendermint/libs/log"
@@ -192,14 +191,12 @@ func (w *byteBufferWAL) WriteSync(m WALMessage) {
w.Write(m)
}
func (w *byteBufferWAL) Group() *auto.Group {
panic("not implemented")
}
func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (gr *auto.GroupReader, found bool, err error) {
func (w *byteBufferWAL) FlushAndSync() error { return nil }
func (w *byteBufferWAL) SearchForEndHeight(height int64, options *WALSearchOptions) (rd io.ReadCloser, found bool, err error) {
return nil, false, nil
}
func (w *byteBufferWAL) Start() error { return nil }
func (w *byteBufferWAL) Stop() error { return nil }
func (w *byteBufferWAL) Wait() {}
func (w *byteBufferWAL) Flush() error { return nil }

View File

@@ -32,8 +32,10 @@ func TestWALTruncate(t *testing.T) {
walFile := filepath.Join(walDir, "wal")
//this magic number 4K can truncate the content when RotateFile. defaultHeadSizeLimit(10M) is hard to simulate.
//this magic number 1 * time.Millisecond make RotateFile check frequently. defaultGroupCheckDuration(5s) is hard to simulate.
// this magic number 4K can truncate the content when RotateFile.
// defaultHeadSizeLimit(10M) is hard to simulate.
// this magic number 1 * time.Millisecond make RotateFile check frequently.
// defaultGroupCheckDuration(5s) is hard to simulate.
wal, err := NewWAL(walFile,
autofile.GroupHeadSizeLimit(4096),
autofile.GroupCheckDuration(1*time.Millisecond),
@@ -49,14 +51,15 @@ func TestWALTruncate(t *testing.T) {
wal.Wait()
}()
//60 block's size nearly 70K, greater than group's headBuf size(4096 * 10), when headBuf is full, truncate content will Flush to the file.
//at this time, RotateFile is called, truncate content exist in each file.
// 60 block's size nearly 70K, greater than group's headBuf size(4096 * 10),
// when headBuf is full, truncate content will Flush to the file. at this
// time, RotateFile is called, truncate content exist in each file.
err = WALGenerateNBlocks(t, wal.Group(), 60)
require.NoError(t, err)
time.Sleep(1 * time.Millisecond) //wait groupCheckDuration, make sure RotateFile run
wal.Group().Flush()
wal.FlushAndSync()
h := int64(50)
gr, found, err := wal.SearchForEndHeight(h, &WALSearchOptions{})