2016-10-26 16:23:19 -07:00
|
|
|
package autofile
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
2016-11-05 17:58:50 -07:00
|
|
|
"errors"
|
2016-11-21 19:09:14 -08:00
|
|
|
"fmt"
|
2016-10-26 16:23:19 -07:00
|
|
|
"io"
|
|
|
|
"os"
|
|
|
|
"path"
|
|
|
|
"path/filepath"
|
|
|
|
"regexp"
|
|
|
|
"strconv"
|
|
|
|
"strings"
|
|
|
|
"sync"
|
|
|
|
"time"
|
2016-11-05 17:58:50 -07:00
|
|
|
|
2018-07-01 22:36:49 -04:00
|
|
|
cmn "github.com/tendermint/tendermint/libs/common"
|
2016-10-26 16:23:19 -07:00
|
|
|
)
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
const (
|
2018-09-25 19:22:45 +08:00
|
|
|
defaultGroupCheckDuration = 5000 * time.Millisecond
|
|
|
|
defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB
|
|
|
|
defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB
|
|
|
|
maxFilesToRemove = 4 // needs to be greater than 1
|
2017-10-17 16:26:52 +04:00
|
|
|
)
|
|
|
|
|
2016-10-26 16:23:19 -07:00
|
|
|
/*
|
|
|
|
You can open a Group to keep restrictions on an AutoFile, like
|
|
|
|
the maximum size of each chunk, and/or the total amount of bytes
|
|
|
|
stored in the group.
|
|
|
|
|
2016-10-26 22:11:43 -07:00
|
|
|
The first file to be written in the Group.Dir is the head file.
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
Dir/
|
|
|
|
- <HeadPath>
|
2016-10-26 22:11:43 -07:00
|
|
|
|
|
|
|
Once the Head file reaches the size limit, it will be rotated.
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
Dir/
|
|
|
|
- <HeadPath>.000 // First rolled file
|
|
|
|
- <HeadPath> // New head path, starts empty.
|
|
|
|
// The implicit index is 001.
|
2016-10-26 22:11:43 -07:00
|
|
|
|
|
|
|
As more files are written, the index numbers grow...
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
Dir/
|
|
|
|
- <HeadPath>.000 // First rolled file
|
|
|
|
- <HeadPath>.001 // Second rolled file
|
|
|
|
- ...
|
|
|
|
- <HeadPath> // New head path
|
2016-10-26 22:11:43 -07:00
|
|
|
|
|
|
|
The Group can also be used to binary-search for some line,
|
|
|
|
assuming that marker lines are written occasionally.
|
2016-10-26 16:23:19 -07:00
|
|
|
*/
|
|
|
|
type Group struct {
|
2018-04-03 12:23:28 +02:00
|
|
|
cmn.BaseService
|
2016-11-21 19:09:14 -08:00
|
|
|
|
2018-09-25 19:22:45 +08:00
|
|
|
ID string
|
|
|
|
Head *AutoFile // The head AutoFile to write to
|
|
|
|
headBuf *bufio.Writer
|
|
|
|
Dir string // Directory that contains .Head
|
|
|
|
ticker *time.Ticker
|
|
|
|
mtx sync.Mutex
|
|
|
|
headSizeLimit int64
|
|
|
|
totalSizeLimit int64
|
|
|
|
groupCheckDuration time.Duration
|
|
|
|
minIndex int // Includes head
|
|
|
|
maxIndex int // Includes head, where Head will move to
|
2016-11-05 17:58:50 -07:00
|
|
|
|
2019-02-06 10:24:43 -05:00
|
|
|
// close this when the processTicks routine is done.
|
|
|
|
// this ensures we can cleanup the dir after calling Stop
|
|
|
|
// and the routine won't be trying to access it anymore
|
|
|
|
doneProcessTicks chan struct{}
|
|
|
|
|
2016-11-05 17:58:50 -07:00
|
|
|
// TODO: When we start deleting files, we need to start tracking GroupReaders
|
|
|
|
// and their dependencies.
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
2018-06-03 11:56:57 +04:00
|
|
|
// OpenGroup creates a new Group with head at headPath. It returns an error if
|
|
|
|
// it fails to open head file.
|
2018-09-25 19:22:45 +08:00
|
|
|
func OpenGroup(headPath string, groupOptions ...func(*Group)) (g *Group, err error) {
|
2016-11-21 19:09:14 -08:00
|
|
|
dir := path.Dir(headPath)
|
|
|
|
head, err := OpenAutoFile(headPath)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-10-26 16:23:19 -07:00
|
|
|
|
|
|
|
g = &Group{
|
2018-09-25 19:22:45 +08:00
|
|
|
ID: "group:" + head.ID,
|
|
|
|
Head: head,
|
|
|
|
headBuf: bufio.NewWriterSize(head, 4096*10),
|
|
|
|
Dir: dir,
|
|
|
|
headSizeLimit: defaultHeadSizeLimit,
|
|
|
|
totalSizeLimit: defaultTotalSizeLimit,
|
|
|
|
groupCheckDuration: defaultGroupCheckDuration,
|
|
|
|
minIndex: 0,
|
|
|
|
maxIndex: 0,
|
2019-02-06 10:24:43 -05:00
|
|
|
doneProcessTicks: make(chan struct{}),
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
2018-09-25 19:22:45 +08:00
|
|
|
|
|
|
|
for _, option := range groupOptions {
|
|
|
|
option(g)
|
|
|
|
}
|
|
|
|
|
2018-04-03 12:23:28 +02:00
|
|
|
g.BaseService = *cmn.NewBaseService(nil, "Group", g)
|
2016-11-21 19:09:14 -08:00
|
|
|
|
2016-10-26 21:50:07 -07:00
|
|
|
gInfo := g.readGroupInfo()
|
|
|
|
g.minIndex = gInfo.MinIndex
|
|
|
|
g.maxIndex = gInfo.MaxIndex
|
2016-11-21 19:09:14 -08:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-09-25 19:22:45 +08:00
|
|
|
// GroupCheckDuration allows you to overwrite default groupCheckDuration.
|
|
|
|
func GroupCheckDuration(duration time.Duration) func(*Group) {
|
|
|
|
return func(g *Group) {
|
|
|
|
g.groupCheckDuration = duration
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// GroupHeadSizeLimit allows you to overwrite default head size limit - 10MB.
|
|
|
|
func GroupHeadSizeLimit(limit int64) func(*Group) {
|
|
|
|
return func(g *Group) {
|
|
|
|
g.headSizeLimit = limit
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// GroupTotalSizeLimit allows you to overwrite default total size limit of the group - 1GB.
|
|
|
|
func GroupTotalSizeLimit(limit int64) func(*Group) {
|
|
|
|
return func(g *Group) {
|
|
|
|
g.totalSizeLimit = limit
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-02-25 09:11:07 +04:00
|
|
|
// OnStart implements cmn.Service by starting the goroutine that checks file
|
|
|
|
// and group limits.
|
2016-11-21 19:09:14 -08:00
|
|
|
func (g *Group) OnStart() error {
|
2018-09-25 19:22:45 +08:00
|
|
|
g.ticker = time.NewTicker(g.groupCheckDuration)
|
2016-10-26 16:23:19 -07:00
|
|
|
go g.processTicks()
|
2016-11-21 19:09:14 -08:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-02-25 09:11:07 +04:00
|
|
|
// OnStop implements cmn.Service by stopping the goroutine described above.
|
2018-06-03 11:56:57 +04:00
|
|
|
// NOTE: g.Head must be closed separately using Close.
|
2016-11-21 19:09:14 -08:00
|
|
|
func (g *Group) OnStop() {
|
|
|
|
g.ticker.Stop()
|
2019-02-25 09:11:07 +04:00
|
|
|
g.FlushAndSync()
|
2018-06-03 11:56:57 +04:00
|
|
|
}
|
|
|
|
|
2019-02-25 09:11:07 +04:00
|
|
|
// Wait blocks until all internal goroutines are finished. Supposed to be
|
|
|
|
// called after Stop.
|
2019-02-06 10:24:43 -05:00
|
|
|
func (g *Group) Wait() {
|
|
|
|
// wait for processTicks routine to finish
|
|
|
|
<-g.doneProcessTicks
|
|
|
|
}
|
|
|
|
|
2018-06-03 11:56:57 +04:00
|
|
|
// Close closes the head file. The group must be stopped by this moment.
|
|
|
|
func (g *Group) Close() {
|
2019-02-25 09:11:07 +04:00
|
|
|
g.FlushAndSync()
|
2018-06-03 11:56:57 +04:00
|
|
|
|
|
|
|
g.mtx.Lock()
|
|
|
|
_ = g.Head.closeFile()
|
|
|
|
g.mtx.Unlock()
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// HeadSizeLimit returns the current head size limit.
|
2016-10-26 16:23:19 -07:00
|
|
|
func (g *Group) HeadSizeLimit() int64 {
|
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
|
|
|
return g.headSizeLimit
|
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// TotalSizeLimit returns total size limit of the group.
|
2016-10-26 16:23:19 -07:00
|
|
|
func (g *Group) TotalSizeLimit() int64 {
|
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
|
|
|
return g.totalSizeLimit
|
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// MaxIndex returns index of the last file in the group.
|
2016-10-26 21:50:07 -07:00
|
|
|
func (g *Group) MaxIndex() int {
|
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
|
|
|
return g.maxIndex
|
|
|
|
}
|
|
|
|
|
2017-10-20 13:09:11 +04:00
|
|
|
// MinIndex returns index of the first file in the group.
|
|
|
|
func (g *Group) MinIndex() int {
|
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
|
|
|
return g.minIndex
|
|
|
|
}
|
|
|
|
|
2017-10-17 16:48:44 +04:00
|
|
|
// Write writes the contents of p into the current head of the group. It
|
|
|
|
// returns the number of bytes written. If nn < len(p), it also returns an
|
|
|
|
// error explaining why the write is short.
|
|
|
|
// NOTE: Writes are buffered so they don't write synchronously
|
|
|
|
// TODO: Make it halt if space is unavailable
|
|
|
|
func (g *Group) Write(p []byte) (nn int, err error) {
|
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
|
|
|
return g.headBuf.Write(p)
|
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// WriteLine writes line into the current head of the group. It also appends "\n".
|
2016-11-20 17:19:15 -08:00
|
|
|
// NOTE: Writes are buffered so they don't write synchronously
|
2016-10-28 13:56:31 -07:00
|
|
|
// TODO: Make it halt if space is unavailable
|
|
|
|
func (g *Group) WriteLine(line string) error {
|
2016-11-21 19:09:14 -08:00
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
2016-11-20 17:19:15 -08:00
|
|
|
_, err := g.headBuf.Write([]byte(line + "\n"))
|
2016-10-28 13:56:31 -07:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-02-20 07:45:18 +02:00
|
|
|
// Buffered returns the size of the currently buffered data.
|
|
|
|
func (g *Group) Buffered() int {
|
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
|
|
|
return g.headBuf.Buffered()
|
|
|
|
}
|
|
|
|
|
2019-02-25 09:11:07 +04:00
|
|
|
// FlushAndSync writes any buffered data to the underlying file and commits the
|
|
|
|
// current content of the file to stable storage (fsync).
|
|
|
|
func (g *Group) FlushAndSync() error {
|
2016-11-21 19:09:14 -08:00
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
call fsync after flush (Refs #573)
short: flushing the bufio buffer is not enough to ensure data
consistency.
long:
Saving an entry to the WAL calls writeLine to append data to the
autofile group backing the WAL, then calls group.Flush() to flush that
data to persistent storage. group.Flush() in turn proxies to
headBuf.flush(), flushing the active bufio.BufferedWriter. However,
BufferedWriter wraps a Writer, not another BufferedWriter, and the way
it flushes is by calling io.Writer.Write() to clear the BufferedWriter's
buffer. The io.Writer we're wrapping here is AutoFile, whose Write
method calls os.File.Write(), performing an unbuffered write to the
operating system, where, I assume, it sits in the OS buffers awaiting
sync. This means that Wal.Save does not, in fact, ensure the saved
operation is synced to disk before returning.
2017-09-21 16:11:28 -07:00
|
|
|
err := g.headBuf.Flush()
|
|
|
|
if err == nil {
|
|
|
|
err = g.Head.Sync()
|
|
|
|
}
|
|
|
|
return err
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Group) processTicks() {
|
2019-02-06 10:24:43 -05:00
|
|
|
defer close(g.doneProcessTicks)
|
2018-08-31 14:05:49 -04:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case <-g.ticker.C:
|
|
|
|
g.checkHeadSizeLimit()
|
|
|
|
g.checkTotalSizeLimit()
|
|
|
|
case <-g.Quit():
|
|
|
|
return
|
|
|
|
}
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// NOTE: this function is called manually in tests.
|
|
|
|
func (g *Group) checkHeadSizeLimit() {
|
2016-11-21 19:09:14 -08:00
|
|
|
limit := g.HeadSizeLimit()
|
|
|
|
if limit == 0 {
|
|
|
|
return
|
|
|
|
}
|
2016-10-26 16:23:19 -07:00
|
|
|
size, err := g.Head.Size()
|
|
|
|
if err != nil {
|
2018-11-06 13:14:47 +01:00
|
|
|
g.Logger.Error("Group's head may grow without bound", "head", g.Head.Path, "err", err)
|
|
|
|
return
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
2016-11-21 19:09:14 -08:00
|
|
|
if size >= limit {
|
2016-10-26 16:23:19 -07:00
|
|
|
g.RotateFile()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (g *Group) checkTotalSizeLimit() {
|
2016-11-21 19:09:14 -08:00
|
|
|
limit := g.TotalSizeLimit()
|
|
|
|
if limit == 0 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
gInfo := g.readGroupInfo()
|
|
|
|
totalSize := gInfo.TotalSize
|
|
|
|
for i := 0; i < maxFilesToRemove; i++ {
|
|
|
|
index := gInfo.MinIndex + i
|
|
|
|
if totalSize < limit {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if index == gInfo.MaxIndex {
|
|
|
|
// Special degenerate case, just do nothing.
|
2018-11-06 13:14:47 +01:00
|
|
|
g.Logger.Error("Group's head may grow without bound", "head", g.Head.Path)
|
2016-11-21 19:09:14 -08:00
|
|
|
return
|
|
|
|
}
|
2016-11-21 19:57:17 -08:00
|
|
|
pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex)
|
2018-11-06 13:14:47 +01:00
|
|
|
fInfo, err := os.Stat(pathToRemove)
|
2016-11-21 19:09:14 -08:00
|
|
|
if err != nil {
|
2018-11-06 13:12:12 +01:00
|
|
|
g.Logger.Error("Failed to fetch info for file", "file", pathToRemove)
|
2016-11-21 19:09:14 -08:00
|
|
|
continue
|
|
|
|
}
|
|
|
|
err = os.Remove(pathToRemove)
|
|
|
|
if err != nil {
|
2018-11-06 13:12:12 +01:00
|
|
|
g.Logger.Error("Failed to remove path", "path", pathToRemove)
|
2016-11-21 19:09:14 -08:00
|
|
|
return
|
|
|
|
}
|
2018-11-06 13:14:47 +01:00
|
|
|
totalSize -= fInfo.Size()
|
2016-11-21 19:09:14 -08:00
|
|
|
}
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// RotateFile causes group to close the current head and assign it some index.
|
|
|
|
// Note it does not create a new head.
|
2016-10-26 16:23:19 -07:00
|
|
|
func (g *Group) RotateFile() {
|
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
|
|
|
|
2017-05-06 22:48:08 +04:00
|
|
|
headPath := g.Head.Path
|
|
|
|
|
2018-09-25 19:22:45 +08:00
|
|
|
if err := g.headBuf.Flush(); err != nil {
|
2018-10-05 01:57:59 +04:00
|
|
|
panic(err)
|
2018-09-25 19:22:45 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
if err := g.Head.Sync(); err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2017-05-06 22:48:08 +04:00
|
|
|
if err := g.Head.closeFile(); err != nil {
|
2016-10-26 16:23:19 -07:00
|
|
|
panic(err)
|
|
|
|
}
|
2017-05-06 22:48:08 +04:00
|
|
|
|
|
|
|
indexPath := filePathForIndex(headPath, g.maxIndex, g.maxIndex+1)
|
|
|
|
if err := os.Rename(headPath, indexPath); err != nil {
|
2016-10-26 16:23:19 -07:00
|
|
|
panic(err)
|
|
|
|
}
|
2017-05-06 22:48:08 +04:00
|
|
|
|
2018-04-03 12:23:28 +02:00
|
|
|
g.maxIndex++
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// NewReader returns a new group reader.
|
|
|
|
// CONTRACT: Caller must close the returned GroupReader.
|
2016-10-30 02:40:39 -07:00
|
|
|
func (g *Group) NewReader(index int) (*GroupReader, error) {
|
2016-10-26 16:23:19 -07:00
|
|
|
r := newGroupReader(g)
|
2016-10-30 02:40:39 -07:00
|
|
|
err := r.SetIndex(index)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-04-03 12:23:28 +02:00
|
|
|
return r, nil
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// GroupInfo holds information about the group.
|
2016-10-26 16:23:19 -07:00
|
|
|
type GroupInfo struct {
|
2017-10-17 16:26:52 +04:00
|
|
|
MinIndex int // index of the first file in the group, including head
|
|
|
|
MaxIndex int // index of the last file in the group, including head
|
|
|
|
TotalSize int64 // total size of the group
|
|
|
|
HeadSize int64 // size of the head
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// Returns info after scanning all files in g.Head's dir.
|
2016-10-26 16:23:19 -07:00
|
|
|
func (g *Group) ReadGroupInfo() GroupInfo {
|
|
|
|
g.mtx.Lock()
|
|
|
|
defer g.mtx.Unlock()
|
|
|
|
return g.readGroupInfo()
|
|
|
|
}
|
|
|
|
|
2016-10-26 21:50:07 -07:00
|
|
|
// Index includes the head.
|
2016-10-26 16:23:19 -07:00
|
|
|
// CONTRACT: caller should have called g.mtx.Lock
|
|
|
|
func (g *Group) readGroupInfo() GroupInfo {
|
|
|
|
groupDir := filepath.Dir(g.Head.Path)
|
|
|
|
headBase := filepath.Base(g.Head.Path)
|
|
|
|
var minIndex, maxIndex int = -1, -1
|
|
|
|
var totalSize, headSize int64 = 0, 0
|
|
|
|
|
|
|
|
dir, err := os.Open(groupDir)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
2016-12-06 01:46:23 -08:00
|
|
|
defer dir.Close()
|
2016-10-26 16:23:19 -07:00
|
|
|
fiz, err := dir.Readdir(0)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// For each file in the directory, filter by pattern
|
|
|
|
for _, fileInfo := range fiz {
|
|
|
|
if fileInfo.Name() == headBase {
|
|
|
|
fileSize := fileInfo.Size()
|
|
|
|
totalSize += fileSize
|
|
|
|
headSize = fileSize
|
|
|
|
continue
|
|
|
|
} else if strings.HasPrefix(fileInfo.Name(), headBase) {
|
|
|
|
fileSize := fileInfo.Size()
|
|
|
|
totalSize += fileSize
|
|
|
|
indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`)
|
|
|
|
submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name()))
|
|
|
|
if len(submatch) != 0 {
|
|
|
|
// Matches
|
|
|
|
fileIndex, err := strconv.Atoi(string(submatch[1]))
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
if maxIndex < fileIndex {
|
|
|
|
maxIndex = fileIndex
|
|
|
|
}
|
|
|
|
if minIndex == -1 || fileIndex < minIndex {
|
|
|
|
minIndex = fileIndex
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-26 21:50:07 -07:00
|
|
|
// Now account for the head.
|
|
|
|
if minIndex == -1 {
|
|
|
|
// If there were no numbered files,
|
|
|
|
// then the head is index 0.
|
|
|
|
minIndex, maxIndex = 0, 0
|
|
|
|
} else {
|
|
|
|
// Otherwise, the head file is 1 greater
|
2018-04-03 12:23:28 +02:00
|
|
|
maxIndex++
|
2016-10-26 21:50:07 -07:00
|
|
|
}
|
2016-10-26 16:23:19 -07:00
|
|
|
return GroupInfo{minIndex, maxIndex, totalSize, headSize}
|
|
|
|
}
|
|
|
|
|
2016-11-21 19:09:14 -08:00
|
|
|
func filePathForIndex(headPath string, index int, maxIndex int) string {
|
|
|
|
if index == maxIndex {
|
|
|
|
return headPath
|
|
|
|
}
|
2018-04-03 12:23:28 +02:00
|
|
|
return fmt.Sprintf("%v.%03d", headPath, index)
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
//--------------------------------------------------------------------------------
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// GroupReader provides an interface for reading from a Group.
|
2016-10-26 16:23:19 -07:00
|
|
|
type GroupReader struct {
|
|
|
|
*Group
|
|
|
|
mtx sync.Mutex
|
|
|
|
curIndex int
|
|
|
|
curFile *os.File
|
|
|
|
curReader *bufio.Reader
|
|
|
|
curLine []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
func newGroupReader(g *Group) *GroupReader {
|
|
|
|
return &GroupReader{
|
|
|
|
Group: g,
|
2016-10-26 21:50:07 -07:00
|
|
|
curIndex: 0,
|
2016-10-26 16:23:19 -07:00
|
|
|
curFile: nil,
|
|
|
|
curReader: nil,
|
|
|
|
curLine: nil,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// Close closes the GroupReader by closing the cursor file.
|
2016-10-26 21:50:07 -07:00
|
|
|
func (gr *GroupReader) Close() error {
|
|
|
|
gr.mtx.Lock()
|
|
|
|
defer gr.mtx.Unlock()
|
|
|
|
|
|
|
|
if gr.curReader != nil {
|
|
|
|
err := gr.curFile.Close()
|
|
|
|
gr.curIndex = 0
|
|
|
|
gr.curReader = nil
|
|
|
|
gr.curFile = nil
|
|
|
|
gr.curLine = nil
|
|
|
|
return err
|
|
|
|
}
|
2018-04-03 12:23:28 +02:00
|
|
|
return nil
|
2016-10-26 21:50:07 -07:00
|
|
|
}
|
|
|
|
|
2017-10-20 12:38:45 +04:00
|
|
|
// Read implements io.Reader, reading bytes from the current Reader
|
|
|
|
// incrementing index until enough bytes are read.
|
|
|
|
func (gr *GroupReader) Read(p []byte) (n int, err error) {
|
2017-10-23 13:02:02 +04:00
|
|
|
lenP := len(p)
|
|
|
|
if lenP == 0 {
|
|
|
|
return 0, errors.New("given empty slice")
|
|
|
|
}
|
|
|
|
|
2017-10-20 12:38:45 +04:00
|
|
|
gr.mtx.Lock()
|
|
|
|
defer gr.mtx.Unlock()
|
|
|
|
|
|
|
|
// Open file if not open yet
|
|
|
|
if gr.curReader == nil {
|
|
|
|
if err = gr.openFile(gr.curIndex); err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Iterate over files until enough bytes are read
|
2017-10-24 23:19:53 +04:00
|
|
|
var nn int
|
2017-10-20 12:38:45 +04:00
|
|
|
for {
|
2017-10-24 23:19:53 +04:00
|
|
|
nn, err = gr.curReader.Read(p[n:])
|
2017-10-20 12:38:45 +04:00
|
|
|
n += nn
|
2019-08-02 08:53:52 +02:00
|
|
|
switch {
|
|
|
|
case err == io.EOF:
|
2017-10-20 12:38:45 +04:00
|
|
|
if n >= lenP {
|
|
|
|
return n, nil
|
2018-04-03 12:23:28 +02:00
|
|
|
}
|
|
|
|
// Open the next file
|
|
|
|
if err1 := gr.openFile(gr.curIndex + 1); err1 != nil {
|
|
|
|
return n, err1
|
2017-10-20 12:38:45 +04:00
|
|
|
}
|
2019-08-02 08:53:52 +02:00
|
|
|
case err != nil:
|
2017-10-20 12:38:45 +04:00
|
|
|
return n, err
|
2019-08-02 08:53:52 +02:00
|
|
|
case nn == 0: // empty file
|
2017-10-23 13:02:14 +04:00
|
|
|
return n, err
|
2017-10-20 12:38:45 +04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-10-26 21:50:07 -07:00
|
|
|
// IF index > gr.Group.maxIndex, returns io.EOF
|
|
|
|
// CONTRACT: caller should hold gr.mtx
|
|
|
|
func (gr *GroupReader) openFile(index int) error {
|
2016-10-26 16:23:19 -07:00
|
|
|
// Lock on Group to ensure that head doesn't move in the meanwhile.
|
2016-10-26 21:50:07 -07:00
|
|
|
gr.Group.mtx.Lock()
|
|
|
|
defer gr.Group.mtx.Unlock()
|
|
|
|
|
2016-11-21 19:09:14 -08:00
|
|
|
if index > gr.Group.maxIndex {
|
2016-10-26 21:50:07 -07:00
|
|
|
return io.EOF
|
|
|
|
}
|
2016-10-26 16:23:19 -07:00
|
|
|
|
2016-11-21 19:09:14 -08:00
|
|
|
curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex)
|
2018-10-05 01:57:59 +04:00
|
|
|
curFile, err := os.OpenFile(curFilePath, os.O_RDONLY|os.O_CREATE, autoFilePerms)
|
2016-10-26 16:23:19 -07:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
curReader := bufio.NewReader(curFile)
|
|
|
|
|
2016-10-26 21:50:07 -07:00
|
|
|
// Update gr.cur*
|
2016-12-06 01:46:23 -08:00
|
|
|
if gr.curFile != nil {
|
|
|
|
gr.curFile.Close() // TODO return error?
|
|
|
|
}
|
2016-10-26 21:50:07 -07:00
|
|
|
gr.curIndex = index
|
|
|
|
gr.curFile = curFile
|
|
|
|
gr.curReader = curReader
|
|
|
|
gr.curLine = nil
|
2016-10-26 16:23:19 -07:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// CurIndex returns cursor's file index.
|
2016-10-26 21:50:07 -07:00
|
|
|
func (gr *GroupReader) CurIndex() int {
|
|
|
|
gr.mtx.Lock()
|
|
|
|
defer gr.mtx.Unlock()
|
|
|
|
return gr.curIndex
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|
|
|
|
|
2017-10-17 16:26:52 +04:00
|
|
|
// SetIndex sets the cursor's file index to index by opening a file at this
|
|
|
|
// position.
|
2016-10-30 02:40:39 -07:00
|
|
|
func (gr *GroupReader) SetIndex(index int) error {
|
2016-10-26 21:50:07 -07:00
|
|
|
gr.mtx.Lock()
|
|
|
|
defer gr.mtx.Unlock()
|
2016-10-30 02:40:39 -07:00
|
|
|
return gr.openFile(index)
|
2016-10-26 16:23:19 -07:00
|
|
|
}
|