Thane Thomson dff3deb2a9 cs: sync WAL more frequently (#3300)
As per #3043, this adds a ticker to sync the WAL every 2s while the WAL is running.

* Flush WAL every 2s

This adds a ticker that flushes the WAL every 2s while the WAL is
running. This is related to #3043.

* Fix spelling

* Increase timeout to 2mins for slower build environments

* Make WAL sync interval configurable

* Add TODO to replace testChan with more comprehensive testBus

* Remove extraneous debug statement

* Remove testChan in favour of using system time

As per
https://github.com/tendermint/tendermint/pull/3300#discussion_r255886586,
this removes the `testChan` WAL member and replaces the approach with a
system time-oriented one. In this new approach, we keep track of the
system time at which each flush and periodic flush successfully
occurred.

The naming of the various functions is also updated here to be more
consistent with "flushing" as opposed to "sync'ing".

* Update naming convention and ensure lock for timestamp update

* Add Flush method as part of WAL interface

Adds a `Flush` method as part of the WAL interface to enforce the idea
that we can manually trigger a WAL flush from outside of the WAL. This
is employed in the consensus state management to flush the WAL prior to
signing votes/proposals, as per https://github.com/tendermint/tendermint/issues/3043#issuecomment-453853630

* Update CHANGELOG_PENDING

* Remove mutex approach and replace with DI

The dependency injection approach to dealing with testing concerns could
allow similar effects to some kind of "testing bus"-based approach. This
commit introduces an example of this, where instead of relying on
(potentially fragile) timing of things between the code and the test, we
inject code into the function under test that can signal the test
through a channel.

This allows us to avoid the `time.Sleep()`-based approach previously
employed.

* Update comment on WAL flushing during vote signing

Co-Authored-By: thanethomson <connect@thanethomson.com>

* Simplify flush interval definition

Co-Authored-By: thanethomson <connect@thanethomson.com>

* Expand commentary on WAL disk flushing

Co-Authored-By: thanethomson <connect@thanethomson.com>

* Add broken test to illustrate WAL sync test problem

Removes test-related state (dependency injection code) from the WAL data
structure and adds test code to illustrate the problem with using
`WALGenerateNBlocks` and `wal.SearchForEndHeight` to test periodic
sync'ing.

* Fix test error messages

* Use WAL group buffer size to check for flush

A function is added to `libs/autofile/group.go#Group` in order to return
the size of the buffered data (i.e. data that has not yet been flushed
to disk). The test now checks that, prior to a `time.Sleep`, the group
buffer has data in it. After the `time.Sleep` (during which time the
periodic flush should have been called), the buffer should be empty.

* Remove config root dir removal from #3291

* Add godoc for NewWAL mentioning periodic sync
2019-02-20 09:45:18 +04:00

783 lines
18 KiB
Go

package autofile
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"path"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
)
const (
defaultGroupCheckDuration = 5000 * time.Millisecond
defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
maxFilesToRemove = 4 // needs to be greater than 1
)
/*
You can open a Group to keep restrictions on an AutoFile, like
the maximum size of each chunk, and/or the total amount of bytes
stored in the group.
The first file to be written in the Group.Dir is the head file.
Dir/
- <HeadPath>
Once the Head file reaches the size limit, it will be rotated.
Dir/
- <HeadPath>.000 // First rolled file
- <HeadPath> // New head path, starts empty.
// The implicit index is 001.
As more files are written, the index numbers grow...
Dir/
- <HeadPath>.000 // First rolled file
- <HeadPath>.001 // Second rolled file
- ...
- <HeadPath> // New head path
The Group can also be used to binary-search for some line,
assuming that marker lines are written occasionally.
*/
type Group struct {
cmn.BaseService
ID string
Head *AutoFile // The head AutoFile to write to
headBuf *bufio.Writer
Dir string // Directory that contains .Head
ticker *time.Ticker
mtx sync.Mutex
headSizeLimit int64
totalSizeLimit int64
groupCheckDuration time.Duration
minIndex int // Includes head
maxIndex int // Includes head, where Head will move to
// close this when the processTicks routine is done.
// this ensures we can cleanup the dir after calling Stop
// and the routine won't be trying to access it anymore
doneProcessTicks chan struct{}
// TODO: When we start deleting files, we need to start tracking GroupReaders
// and their dependencies.
}
// OpenGroup creates a new Group with head at headPath. It returns an error if
// it fails to open head file.
func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err error) {
dir := path.Dir(headPath)
head, err := OpenAutoFile(headPath)
if err != nil {
return nil, err
}
g = &Group{
ID: "group:" + head.ID,
Head: head,
headBuf: bufio.NewWriterSize(head, 4096*10),
Dir: dir,
headSizeLimit: defaultHeadSizeLimit,
totalSizeLimit: defaultTotalSizeLimit,
groupCheckDuration: defaultGroupCheckDuration,
minIndex: 0,
maxIndex: 0,
doneProcessTicks: make(chan struct{}),
}
for _, option := range groupOptions {
option(g)
}
g.BaseService = *cmn.NewBaseService(nil, "Group", g)
gInfo := g.readGroupInfo()
g.minIndex = gInfo.MinIndex
g.maxIndex = gInfo.MaxIndex
return
}
// GroupCheckDuration allows you to overwrite default groupCheckDuration.
func GroupCheckDuration(duration time.Duration) func(*Group) {
return func(g *Group) {
g.groupCheckDuration = duration
}
}
// GroupHeadSizeLimit allows you to overwrite default head size limit - 10MB.
func GroupHeadSizeLimit(limit int64) func(*Group) {
return func(g *Group) {
g.headSizeLimit = limit
}
}
// GroupTotalSizeLimit allows you to overwrite default total size limit of the group - 1GB.
func GroupTotalSizeLimit(limit int64) func(*Group) {
return func(g *Group) {
g.totalSizeLimit = limit
}
}
// OnStart implements Service by starting the goroutine that checks file and
// group limits.
func (g *Group) OnStart() error {
g.ticker = time.NewTicker(g.groupCheckDuration)
go g.processTicks()
return nil
}
// OnStop implements Service by stopping the goroutine described above.
// NOTE: g.Head must be closed separately using Close.
func (g *Group) OnStop() {
g.ticker.Stop()
g.Flush() // flush any uncommitted data
}
func (g *Group) Wait() {
// wait for processTicks routine to finish
<-g.doneProcessTicks
}
// Close closes the head file. The group must be stopped by this moment.
func (g *Group) Close() {
g.Flush() // flush any uncommitted data
g.mtx.Lock()
_ = g.Head.closeFile()
g.mtx.Unlock()
}
// HeadSizeLimit returns the current head size limit.
func (g *Group) HeadSizeLimit() int64 {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.headSizeLimit
}
// TotalSizeLimit returns total size limit of the group.
func (g *Group) TotalSizeLimit() int64 {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.totalSizeLimit
}
// MaxIndex returns index of the last file in the group.
func (g *Group) MaxIndex() int {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.maxIndex
}
// MinIndex returns index of the first file in the group.
func (g *Group) MinIndex() int {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.minIndex
}
// Write writes the contents of p into the current head of the group. It
// returns the number of bytes written. If nn < len(p), it also returns an
// error explaining why the write is short.
// NOTE: Writes are buffered so they don't write synchronously
// TODO: Make it halt if space is unavailable
func (g *Group) Write(p []byte) (nn int, err error) {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.headBuf.Write(p)
}
// WriteLine writes line into the current head of the group. It also appends "\n".
// NOTE: Writes are buffered so they don't write synchronously
// TODO: Make it halt if space is unavailable
func (g *Group) WriteLine(line string) error {
g.mtx.Lock()
defer g.mtx.Unlock()
_, err := g.headBuf.Write([]byte(line + "\n"))
return err
}
// Buffered returns the size of the currently buffered data.
func (g *Group) Buffered() int {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.headBuf.Buffered()
}
// Flush writes any buffered data to the underlying file and commits the
// current content of the file to stable storage.
func (g *Group) Flush() error {
g.mtx.Lock()
defer g.mtx.Unlock()
err := g.headBuf.Flush()
if err == nil {
err = g.Head.Sync()
}
return err
}
func (g *Group) processTicks() {
defer close(g.doneProcessTicks)
for {
select {
case <-g.ticker.C:
g.checkHeadSizeLimit()
g.checkTotalSizeLimit()
case <-g.Quit():
return
}
}
}
// NOTE: this function is called manually in tests.
func (g *Group) checkHeadSizeLimit() {
limit := g.HeadSizeLimit()
if limit == 0 {
return
}
size, err := g.Head.Size()
if err != nil {
g.Logger.Error("Group's head may grow without bound", "head", g.Head.Path, "err", err)
return
}
if size >= limit {
g.RotateFile()
}
}
func (g *Group) checkTotalSizeLimit() {
limit := g.TotalSizeLimit()
if limit == 0 {
return
}
gInfo := g.readGroupInfo()
totalSize := gInfo.TotalSize
for i := 0; i < maxFilesToRemove; i++ {
index := gInfo.MinIndex + i
if totalSize < limit {
return
}
if index == gInfo.MaxIndex {
// Special degenerate case, just do nothing.
g.Logger.Error("Group's head may grow without bound", "head", g.Head.Path)
return
}
pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
fInfo, err := os.Stat(pathToRemove)
if err != nil {
g.Logger.Error("Failed to fetch info for file", "file", pathToRemove)
continue
}
err = os.Remove(pathToRemove)
if err != nil {
g.Logger.Error("Failed to remove path", "path", pathToRemove)
return
}
totalSize -= fInfo.Size()
}
}
// RotateFile causes group to close the current head and assign it some index.
// Note it does not create a new head.
func (g *Group) RotateFile() {
g.mtx.Lock()
defer g.mtx.Unlock()
headPath := g.Head.Path
if err := g.headBuf.Flush(); err != nil {
panic(err)
}
if err := g.Head.Sync(); err != nil {
panic(err)
}
if err := g.Head.closeFile(); err != nil {
panic(err)
}
indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
if err := os.Rename(headPath, indexPath); err != nil {
panic(err)
}
g.maxIndex++
}
// NewReader returns a new group reader.
// CONTRACT: Caller must close the returned GroupReader.
func (g *Group) NewReader(index int) (*GroupReader, error) {
r := newGroupReader(g)
err := r.SetIndex(index)
if err != nil {
return nil, err
}
return r, nil
}
// Returns -1 if line comes after, 0 if found, 1 if line comes before.
type SearchFunc func(line string) (int, error)
// Searches for the right file in Group, then returns a GroupReader to start
// streaming lines.
// Returns true if an exact match was found, otherwise returns the next greater
// line that starts with prefix.
// CONTRACT: Caller must close the returned GroupReader
func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) {
g.mtx.Lock()
minIndex, maxIndex := g.minIndex, g.maxIndex
g.mtx.Unlock()
// Now minIndex/maxIndex may change meanwhile,
// but it shouldn't be a big deal
// (maybe we'll want to limit scanUntil though)
for {
curIndex := (minIndex + maxIndex + 1) / 2
// Base case, when there's only 1 choice left.
if minIndex == maxIndex {
r, err := g.NewReader(maxIndex)
if err != nil {
return nil, false, err
}
match, err := scanUntil(r, prefix, cmp)
if err != nil {
r.Close()
return nil, false, err
}
return r, match, err
}
// Read starting roughly at the middle file,
// until we find line that has prefix.
r, err := g.NewReader(curIndex)
if err != nil {
return nil, false, err
}
foundIndex, line, err := scanNext(r, prefix)
r.Close()
if err != nil {
return nil, false, err
}
// Compare this line to our search query.
val, err := cmp(line)
if err != nil {
return nil, false, err
}
if val < 0 {
// Line will come later
minIndex = foundIndex
} else if val == 0 {
// Stroke of luck, found the line
r, err := g.NewReader(foundIndex)
if err != nil {
return nil, false, err
}
match, err := scanUntil(r, prefix, cmp)
if !match {
panic("Expected match to be true")
}
if err != nil {
r.Close()
return nil, false, err
}
return r, true, err
} else {
// We passed it
maxIndex = curIndex - 1
}
}
}
// Scans and returns the first line that starts with 'prefix'
// Consumes line and returns it.
func scanNext(r *GroupReader, prefix string) (int, string, error) {
for {
line, err := r.ReadLine()
if err != nil {
return 0, "", err
}
if !strings.HasPrefix(line, prefix) {
continue
}
index := r.CurIndex()
return index, line, nil
}
}
// Returns true iff an exact match was found.
// Pushes line, does not consume it.
func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) {
for {
line, err := r.ReadLine()
if err != nil {
return false, err
}
if !strings.HasPrefix(line, prefix) {
continue
}
val, err := cmp(line)
if err != nil {
return false, err
}
if val < 0 {
continue
} else if val == 0 {
r.PushLine(line)
return true, nil
} else {
r.PushLine(line)
return false, nil
}
}
}
// Searches backwards for the last line in Group with prefix.
// Scans each file forward until the end to find the last match.
func (g *Group) FindLast(prefix string) (match string, found bool, err error) {
g.mtx.Lock()
minIndex, maxIndex := g.minIndex, g.maxIndex
g.mtx.Unlock()
r, err := g.NewReader(maxIndex)
if err != nil {
return "", false, err
}
defer r.Close()
// Open files from the back and read
GROUP_LOOP:
for i := maxIndex; i >= minIndex; i-- {
err := r.SetIndex(i)
if err != nil {
return "", false, err
}
// Scan each line and test whether line matches
for {
line, err := r.ReadLine()
if err == io.EOF {
if found {
return match, found, nil
}
continue GROUP_LOOP
} else if err != nil {
return "", false, err
}
if strings.HasPrefix(line, prefix) {
match = line
found = true
}
if r.CurIndex() > i {
if found {
return match, found, nil
}
continue GROUP_LOOP
}
}
}
return
}
// GroupInfo holds information about the group.
type GroupInfo struct {
MinIndex int // index of the first file in the group, including head
MaxIndex int // index of the last file in the group, including head
TotalSize int64 // total size of the group
HeadSize int64 // size of the head
}
// Returns info after scanning all files in g.Head's dir.
func (g *Group) ReadGroupInfo() GroupInfo {
g.mtx.Lock()
defer g.mtx.Unlock()
return g.readGroupInfo()
}
// Index includes the head.
// CONTRACT: caller should have called g.mtx.Lock
func (g *Group) readGroupInfo() GroupInfo {
groupDir := filepath.Dir(g.Head.Path)
headBase := filepath.Base(g.Head.Path)
var minIndex, maxIndex int = -1, -1
var totalSize, headSize int64 = 0, 0
dir, err := os.Open(groupDir)
if err != nil {
panic(err)
}
defer dir.Close()
fiz, err := dir.Readdir(0)
if err != nil {
panic(err)
}
// For each file in the directory, filter by pattern
for _, fileInfo := range fiz {
if fileInfo.Name() == headBase {
fileSize := fileInfo.Size()
totalSize += fileSize
headSize = fileSize
continue
} else if strings.HasPrefix(fileInfo.Name(), headBase) {
fileSize := fileInfo.Size()
totalSize += fileSize
indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
if len(submatch) != 0 {
// Matches
fileIndex, err := strconv.Atoi(string(submatch[1]))
if err != nil {
panic(err)
}
if maxIndex < fileIndex {
maxIndex = fileIndex
}
if minIndex == -1 || fileIndex < minIndex {
minIndex = fileIndex
}
}
}
}
// Now account for the head.
if minIndex == -1 {
// If there were no numbered files,
// then the head is index 0.
minIndex, maxIndex = 0, 0
} else {
// Otherwise, the head file is 1 greater
maxIndex++
}
return GroupInfo{minIndex, maxIndex, totalSize, headSize}
}
func filePathForIndex(headPath string, index int, maxIndex int) string {
if index == maxIndex {
return headPath
}
return fmt.Sprintf("%v.%03d", headPath, index)
}
//--------------------------------------------------------------------------------
// GroupReader provides an interface for reading from a Group.
type GroupReader struct {
*Group
mtx sync.Mutex
curIndex int
curFile *os.File
curReader *bufio.Reader
curLine []byte
}
func newGroupReader(g *Group) *GroupReader {
return &GroupReader{
Group: g,
curIndex: 0,
curFile: nil,
curReader: nil,
curLine: nil,
}
}
// Close closes the GroupReader by closing the cursor file.
func (gr *GroupReader) Close() error {
gr.mtx.Lock()
defer gr.mtx.Unlock()
if gr.curReader != nil {
err := gr.curFile.Close()
gr.curIndex = 0
gr.curReader = nil
gr.curFile = nil
gr.curLine = nil
return err
}
return nil
}
// Read implements io.Reader, reading bytes from the current Reader
// incrementing index until enough bytes are read.
func (gr *GroupReader) Read(p []byte) (n int, err error) {
lenP := len(p)
if lenP == 0 {
return 0, errors.New("given empty slice")
}
gr.mtx.Lock()
defer gr.mtx.Unlock()
// Open file if not open yet
if gr.curReader == nil {
if err = gr.openFile(gr.curIndex); err != nil {
return 0, err
}
}
// Iterate over files until enough bytes are read
var nn int
for {
nn, err = gr.curReader.Read(p[n:])
n += nn
if err == io.EOF {
if n >= lenP {
return n, nil
}
// Open the next file
if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
return n, err1
}
} else if err != nil {
return n, err
} else if nn == 0 { // empty file
return n, err
}
}
}
// ReadLine reads a line (without delimiter).
// just return io.EOF if no new lines found.
func (gr *GroupReader) ReadLine() (string, error) {
gr.mtx.Lock()
defer gr.mtx.Unlock()
// From PushLine
if gr.curLine != nil {
line := string(gr.curLine)
gr.curLine = nil
return line, nil
}
// Open file if not open yet
if gr.curReader == nil {
err := gr.openFile(gr.curIndex)
if err != nil {
return "", err
}
}
// Iterate over files until line is found
var linePrefix string
for {
bytesRead, err := gr.curReader.ReadBytes('\n')
if err == io.EOF {
// Open the next file
if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
return "", err1
}
if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') {
return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
}
linePrefix += string(bytesRead)
continue
} else if err != nil {
return "", err
}
return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil
}
}
// IF index > gr.Group.maxIndex, returns io.EOF
// CONTRACT: caller should hold gr.mtx
func (gr *GroupReader) openFile(index int) error {
// Lock on Group to ensure that head doesn't move in the meanwhile.
gr.Group.mtx.Lock()
defer gr.Group.mtx.Unlock()
if index > gr.Group.maxIndex {
return io.EOF
}
curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
curFile, err := os.OpenFile(curFilePath, os.O_RDONLY|os.O_CREATE, autoFilePerms)
if err != nil {
return err
}
curReader := bufio.NewReader(curFile)
// Update gr.cur*
if gr.curFile != nil {
gr.curFile.Close() // TODO return error?
}
gr.curIndex = index
gr.curFile = curFile
gr.curReader = curReader
gr.curLine = nil
return nil
}
// PushLine makes the given line the current one, so the next time somebody
// calls ReadLine, this line will be returned.
// panics if called twice without calling ReadLine.
func (gr *GroupReader) PushLine(line string) {
gr.mtx.Lock()
defer gr.mtx.Unlock()
if gr.curLine == nil {
gr.curLine = []byte(line)
} else {
panic("PushLine failed, already have line")
}
}
// CurIndex returns cursor's file index.
func (gr *GroupReader) CurIndex() int {
gr.mtx.Lock()
defer gr.mtx.Unlock()
return gr.curIndex
}
// SetIndex sets the cursor's file index to index by opening a file at this
// position.
func (gr *GroupReader) SetIndex(index int) error {
gr.mtx.Lock()
defer gr.mtx.Unlock()
return gr.openFile(index)
}
//--------------------------------------------------------------------------------
// A simple SearchFunc that assumes that the marker is of form
// <prefix><number>.
// For example, if prefix is '#HEIGHT:', the markers of expected to be of the form:
//
// #HEIGHT:1
// ...
// #HEIGHT:2
// ...
func MakeSimpleSearchFunc(prefix string, target int) SearchFunc {
return func(line string) (int, error) {
if !strings.HasPrefix(line, prefix) {
return -1, fmt.Errorf("Marker line did not have prefix: %v", prefix)
}
i, err := strconv.Atoi(line[len(prefix):])
if err != nil {
return -1, fmt.Errorf("Failed to parse marker line: %v", err.Error())
}
if target < i {
return 1, nil
} else if target == i {
return 0, nil
} else {
return -1, nil
}
}
}