From 28b3d52948b1590be9e3d2e456780d5e41929aca Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 26 Oct 2016 16:22:43 -0700 Subject: [PATCH 01/17] first commit --- README.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 README.md diff --git a/README.md b/README.md new file mode 100644 index 00000000..23799200 --- /dev/null +++ b/README.md @@ -0,0 +1 @@ +# go-autofile From 1859c4d5fe2a0cbb0071b010ef8f604bb397feca Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 26 Oct 2016 16:23:19 -0700 Subject: [PATCH 02/17] First commit --- .gitignore | 2 + autofile.go | 116 ++++++++++++++ autofile_test.go | 73 +++++++++ group.go | 396 ++++++++++++++++++++++++++++++++++++++++++++++ group_test.go | 110 +++++++++++++ sighup_watcher.go | 63 ++++++++ 6 files changed, 760 insertions(+) create mode 100644 .gitignore create mode 100644 autofile.go create mode 100644 autofile_test.go create mode 100644 group.go create mode 100644 group_test.go create mode 100644 sighup_watcher.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 00000000..38193138 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +*.swp +*.swo diff --git a/autofile.go b/autofile.go new file mode 100644 index 00000000..ed9d549b --- /dev/null +++ b/autofile.go @@ -0,0 +1,116 @@ +package autofile + +import ( + . "github.com/tendermint/go-common" + "os" + "sync" + "time" +) + +/* AutoFile usage + +// Create/Append to ./autofile_test +af, err := OpenAutoFile("autofile_test") +if err != nil { + panic(err) +} + +// Stream of writes. +// During this time, the file may be moved e.g. by logRotate. +for i := 0; i < 60; i++ { + af.Write([]byte(Fmt("LOOP(%v)", i))) + time.Sleep(time.Second) +} + +// Close the AutoFile +err = af.Close() +if err != nil { + panic(err) +} +*/ + +const autoFileOpenDuration = 1000 * time.Millisecond + +// Automatically closes and re-opens file for writing. +// This is useful for using a log file with the logrotate tool. +type AutoFile struct { + ID string + Path string + ticker *time.Ticker + mtx sync.Mutex + file *os.File +} + +func OpenAutoFile(path string) (af *AutoFile, err error) { + af = &AutoFile{ + ID: RandStr(12) + ":" + path, + Path: path, + ticker: time.NewTicker(autoFileOpenDuration), + } + if err = af.openFile(); err != nil { + return + } + go af.processTicks() + sighupWatchers.addAutoFile(af) + return +} + +func (af *AutoFile) Close() error { + af.ticker.Stop() + err := af.closeFile() + sighupWatchers.removeAutoFile(af) + return err +} + +func (af *AutoFile) processTicks() { + for { + _, ok := <-af.ticker.C + if !ok { + return // Done. + } + af.closeFile() + } +} + +func (af *AutoFile) closeFile() (err error) { + af.mtx.Lock() + defer af.mtx.Unlock() + + file := af.file + if file == nil { + return nil + } + af.file = nil + return file.Close() +} + +func (af *AutoFile) Write(b []byte) (n int, err error) { + af.mtx.Lock() + defer af.mtx.Unlock() + if af.file == nil { + if err = af.openFile(); err != nil { + return + } + } + return af.file.Write(b) +} + +func (af *AutoFile) openFile() error { + file, err := os.OpenFile(af.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) + if err != nil { + return err + } + af.file = file + return nil +} + +func (af *AutoFile) Size() (int64, error) { + af.mtx.Lock() + defer af.mtx.Unlock() + stat, err := af.file.Stat() + if err != nil { + return -1, err + } + return stat.Size(), nil + +} diff --git a/autofile_test.go b/autofile_test.go new file mode 100644 index 00000000..243125ca --- /dev/null +++ b/autofile_test.go @@ -0,0 +1,73 @@ +package autofile + +import ( + . "github.com/tendermint/go-common" + "os" + "sync/atomic" + "syscall" + "testing" + "time" +) + +func TestSIGHUP(t *testing.T) { + + // First, create an AutoFile writing to a tempfile dir + file, name := Tempfile("sighup_test") + err := file.Close() + if err != nil { + t.Fatalf("Error creating tempfile: %v", err) + } + // Here is the actual AutoFile + af, err := OpenAutoFile(name) + if err != nil { + t.Fatalf("Error creating autofile: %v", err) + } + + // Write to the file. + _, err = af.Write([]byte("Line 1\n")) + if err != nil { + t.Fatalf("Error writing to autofile: %v", err) + } + _, err = af.Write([]byte("Line 2\n")) + if err != nil { + t.Fatalf("Error writing to autofile: %v", err) + } + + // Move the file over + err = os.Rename(name, name+"_old") + if err != nil { + t.Fatalf("Error moving autofile: %v", err) + } + + // Send SIGHUP to self. + oldSighupCounter := atomic.LoadInt32(&sighupCounter) + syscall.Kill(syscall.Getpid(), syscall.SIGHUP) + + // Wait a bit... signals are not handled synchronously. + for atomic.LoadInt32(&sighupCounter) == oldSighupCounter { + time.Sleep(time.Millisecond * 10) + } + + // Write more to the file. + _, err = af.Write([]byte("Line 3\n")) + if err != nil { + t.Fatalf("Error writing to autofile: %v", err) + } + _, err = af.Write([]byte("Line 4\n")) + if err != nil { + t.Fatalf("Error writing to autofile: %v", err) + } + err = af.Close() + if err != nil { + t.Fatalf("Error closing autofile") + } + + // Both files should exist + if body := MustReadFile(name + "_old"); string(body) != "Line 1\nLine 2\n" { + t.Errorf("Unexpected body %s", body) + } + if body := MustReadFile(name); string(body) != "Line 3\nLine 4\n" { + t.Errorf("Unexpected body %s", body) + } + +} diff --git a/group.go b/group.go new file mode 100644 index 00000000..c0d199e1 --- /dev/null +++ b/group.go @@ -0,0 +1,396 @@ +package autofile + +import ( + "bufio" + "fmt" + "io" + "os" + "path" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" + "time" +) + +/* +You can open a Group to keep restrictions on an AutoFile, like +the maximum size of each chunk, and/or the total amount of bytes +stored in the group. + +The Group can also be used to binary-search, and to read atomically +with respect to the Group's Head (the AutoFile being appended to) +*/ + +const groupCheckDuration = 1000 * time.Millisecond + +type Group struct { + ID string + Head *AutoFile // The head AutoFile to write to + Dir string // Directory that contains .Head + ticker *time.Ticker + mtx sync.Mutex + headSizeLimit int64 + totalSizeLimit int64 +} + +func OpenGroup(head *AutoFile) (g *Group, err error) { + dir := path.Dir(head.Path) + + g = &Group{ + ID: "group:" + head.ID, + Head: head, + Dir: dir, + ticker: time.NewTicker(groupCheckDuration), + } + go g.processTicks() + return +} + +func (g *Group) SetHeadSizeLimit(limit int64) { + g.mtx.Lock() + g.headSizeLimit = limit + g.mtx.Unlock() +} + +func (g *Group) HeadSizeLimit() int64 { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.headSizeLimit +} + +func (g *Group) SetTotalSizeLimit(limit int64) { + g.mtx.Lock() + g.totalSizeLimit = limit + g.mtx.Unlock() +} + +func (g *Group) TotalSizeLimit() int64 { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.totalSizeLimit +} + +func (g *Group) Close() error { + g.ticker.Stop() + return nil +} + +func (g *Group) processTicks() { + for { + _, ok := <-g.ticker.C + if !ok { + return // Done. + } + // TODO Check head size limit + // TODO check total size limit + } +} + +// NOTE: for testing +func (g *Group) stopTicker() { + g.ticker.Stop() +} + +// NOTE: this function is called manually in tests. +func (g *Group) checkHeadSizeLimit() { + size, err := g.Head.Size() + if err != nil { + panic(err) + } + if size >= g.HeadSizeLimit() { + g.RotateFile() + } +} + +func (g *Group) checkTotalSizeLimit() { + // TODO enforce total size limit +} + +func (g *Group) RotateFile() { + g.mtx.Lock() + defer g.mtx.Unlock() + + gInfo := g.readGroupInfo() + dstPath := filePathForIndex(g.Head.Path, gInfo.MaxIndex+1) + err := os.Rename(g.Head.Path, dstPath) + if err != nil { + panic(err) + } + err = g.Head.closeFile() + if err != nil { + panic(err) + } +} + +func (g *Group) NewReader(index int) *GroupReader { + r := newGroupReader(g) + r.SetIndex(index) + return r +} + +// Returns -1 if line comes after, 0 if found, 1 if line comes before. +type SearchFunc func(line string) (int, error) + +// Searches for the right file in Group, +// then returns a GroupReader to start streaming lines +// CONTRACT: caller is responsible for closing GroupReader. +func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, error) { + gInfo := g.ReadGroupInfo() + minIndex, maxIndex := gInfo.MinIndex, gInfo.MaxIndex + curIndex := (minIndex + maxIndex + 1) / 2 + + for { + + // Base case, when there's only 1 choice left. + if minIndex == maxIndex { + r := g.NewReader(maxIndex) + err := scanUntil(r, prefix, cmp) + if err != nil { + r.Close() + return nil, err + } else { + return r, err + } + } + + // Read starting roughly at the middle file, + // until we find line that has prefix. + r := g.NewReader(curIndex) + foundIndex, line, err := scanFirst(r, prefix) + r.Close() + if err != nil { + return nil, err + } + + // Compare this line to our search query. + val, err := cmp(line) + if err != nil { + return nil, err + } + if val < 0 { + // Line will come later + minIndex = foundIndex + } else if val == 0 { + // Stroke of luck, found the line + r := g.NewReader(foundIndex) + err := scanUntil(r, prefix, cmp) + if err != nil { + r.Close() + return nil, err + } else { + return r, err + } + } else { + // We passed it + maxIndex = curIndex - 1 + } + } + +} + +// Scans and returns the first line that starts with 'prefix' +func scanFirst(r *GroupReader, prefix string) (int, string, error) { + for { + line, err := r.ReadLine() + if err != nil { + return 0, "", err + } + if !strings.HasPrefix(line, prefix) { + continue + } + index := r.CurIndex() + return index, line, nil + } +} + +func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) error { + for { + line, err := r.ReadLine() + if err != nil { + return err + } + if !strings.HasPrefix(line, prefix) { + continue + } + val, err := cmp(line) + if err != nil { + return err + } + if val < 0 { + continue + } else { + r.PushLine(line) + return nil + } + } +} + +type GroupInfo struct { + MinIndex int + MaxIndex int + TotalSize int64 + HeadSize int64 +} + +// Returns info after scanning all files in g.Head's dir +func (g *Group) ReadGroupInfo() GroupInfo { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.readGroupInfo() +} + +// CONTRACT: caller should have called g.mtx.Lock +func (g *Group) readGroupInfo() GroupInfo { + groupDir := filepath.Dir(g.Head.Path) + headBase := filepath.Base(g.Head.Path) + var minIndex, maxIndex int = -1, -1 + var totalSize, headSize int64 = 0, 0 + + dir, err := os.Open(groupDir) + if err != nil { + panic(err) + } + fiz, err := dir.Readdir(0) + if err != nil { + panic(err) + } + + // For each file in the directory, filter by pattern + for _, fileInfo := range fiz { + if fileInfo.Name() == headBase { + fileSize := fileInfo.Size() + totalSize += fileSize + headSize = fileSize + continue + } else if strings.HasPrefix(fileInfo.Name(), headBase) { + fileSize := fileInfo.Size() + totalSize += fileSize + indexedFilePattern := regexp.MustCompile(`^.+\.([0-9]{3,})$`) + submatch := indexedFilePattern.FindSubmatch([]byte(fileInfo.Name())) + if len(submatch) != 0 { + // Matches + fileIndex, err := strconv.Atoi(string(submatch[1])) + if err != nil { + panic(err) + } + if maxIndex < fileIndex { + maxIndex = fileIndex + } + if minIndex == -1 || fileIndex < minIndex { + minIndex = fileIndex + } + } + } + } + + return GroupInfo{minIndex, maxIndex, totalSize, headSize} +} + +func filePathForIndex(headPath string, index int) string { + return fmt.Sprintf("%v.%03d", headPath, index) +} + +//-------------------------------------------------------------------------------- + +type GroupReader struct { + *Group + mtx sync.Mutex + curIndex int + curFile *os.File + curReader *bufio.Reader + curLine []byte +} + +func newGroupReader(g *Group) *GroupReader { + return &GroupReader{ + Group: g, + curIndex: -1, + curFile: nil, + curReader: nil, + curLine: nil, + } +} + +func (g *GroupReader) ReadLine() (string, error) { + g.mtx.Lock() + defer g.mtx.Unlock() + + // From PushLine + if g.curLine != nil { + line := string(g.curLine) + g.curLine = nil + return line, nil + } + + // Open file if not open yet + if g.curReader == nil { + err := g.openFile(0) + if err != nil { + return "", err + } + } + + // Iterate over files until line is found + for { + bytes, err := g.curReader.ReadBytes('\n') + if err != nil { + if err != io.EOF { + return string(bytes), err + } else { + // Open the next file + err := g.openFile(g.curIndex + 1) + if err != nil { + return "", err + } + } + } + } +} + +// CONTRACT: caller should hold g.mtx +func (g *GroupReader) openFile(index int) error { + + // Lock on Group to ensure that head doesn't move in the meanwhile. + g.Group.mtx.Lock() + defer g.Group.mtx.Unlock() + + curFilePath := filePathForIndex(g.Head.Path, index) + curFile, err := os.Open(curFilePath) + if err != nil { + return err + } + curReader := bufio.NewReader(curFile) + + // Update g.cur* + g.curIndex = index + g.curFile = curFile + g.curReader = curReader + g.curLine = nil + return nil +} + +func (g *GroupReader) PushLine(line string) { + g.mtx.Lock() + defer g.mtx.Unlock() + + if g.curLine == nil { + g.curLine = []byte(line) + } else { + panic("PushLine failed, already have line") + } +} + +// Cursor's file index. +func (g *GroupReader) CurIndex() int { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.curIndex +} + +func (g *GroupReader) SetIndex(index int) { + g.mtx.Lock() + defer g.mtx.Unlock() + g.openFile(index) +} diff --git a/group_test.go b/group_test.go new file mode 100644 index 00000000..8c1b7b6a --- /dev/null +++ b/group_test.go @@ -0,0 +1,110 @@ +package autofile + +import ( + "testing" + + . "github.com/tendermint/go-common" +) + +func createTestGroup(t *testing.T, headPath string) *Group { + autofile, err := OpenAutoFile(headPath) + if err != nil { + t.Fatal("Error opening AutoFile", headPath, err) + } + g, err := OpenGroup(autofile) + if err != nil { + t.Fatal("Error opening Group", err) + } + return g +} + +func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, totalSize, headSize int64) { + if gInfo.MinIndex != minIndex { + t.Errorf("GroupInfo MinIndex expected %v, got %v", minIndex, gInfo.MinIndex) + } + if gInfo.MaxIndex != maxIndex { + t.Errorf("GroupInfo MaxIndex expected %v, got %v", maxIndex, gInfo.MaxIndex) + } + if gInfo.TotalSize != totalSize { + t.Errorf("GroupInfo TotalSize expected %v, got %v", totalSize, gInfo.TotalSize) + } + if gInfo.HeadSize != headSize { + t.Errorf("GroupInfo HeadSize expected %v, got %v", headSize, gInfo.HeadSize) + } +} + +func TestCreateGroup(t *testing.T) { + testID := RandStr(12) + testDir := "_test_" + testID + err := EnsureDir(testDir, 0700) + if err != nil { + t.Fatal("Error creating dir", err) + } + + g := createTestGroup(t, testDir+"/myfile") + if g == nil { + t.Error("Failed to create Group") + } + g.SetHeadSizeLimit(1000 * 1000) + g.stopTicker() + + // At first, there are no files. + assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 0, 0) + + // Write 1000 bytes 999 times. + for i := 0; i < 999; i++ { + _, err := g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + } + assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 999000, 999000) + + // Even calling checkHeadSizeLimit manually won't rotate it. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 999000, 999000) + + // Write 1000 more bytes. + _, err = g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + + // Calling checkHeadSizeLimit this time rolls it. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 1000000, 0) + + // Write 1000 more bytes. + _, err = g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + + // Calling checkHeadSizeLimit does nothing. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 1001000, 1000) + + // Write 1000 bytes 999 times. + for i := 0; i < 999; i++ { + _, err := g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + } + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 2000000, 1000000) + + // Calling checkHeadSizeLimit rolls it again. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 0) + + // Write 1000 more bytes. + _, err = g.Head.Write([]byte(RandStr(999) + "\n")) + if err != nil { + t.Fatal("Error appending to head", err) + } + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2001000, 1000) + + // Calling checkHeadSizeLimit does nothing. + g.checkHeadSizeLimit() + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2001000, 1000) +} diff --git a/sighup_watcher.go b/sighup_watcher.go new file mode 100644 index 00000000..facc238d --- /dev/null +++ b/sighup_watcher.go @@ -0,0 +1,63 @@ +package autofile + +import ( + "os" + "os/signal" + "sync" + "sync/atomic" + "syscall" +) + +func init() { + initSighupWatcher() +} + +var sighupWatchers *SighupWatcher +var sighupCounter int32 // For testing + +func initSighupWatcher() { + sighupWatchers = newSighupWatcher() + + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) + + go func() { + for _ = range c { + sighupWatchers.closeAll() + atomic.AddInt32(&sighupCounter, 1) + } + }() +} + +// Watchces for SIGHUP events and notifies registered AutoFiles +type SighupWatcher struct { + mtx sync.Mutex + autoFiles map[string]*AutoFile +} + +func newSighupWatcher() *SighupWatcher { + return &SighupWatcher{ + autoFiles: make(map[string]*AutoFile, 10), + } +} + +func (w *SighupWatcher) addAutoFile(af *AutoFile) { + w.mtx.Lock() + w.autoFiles[af.ID] = af + w.mtx.Unlock() +} + +// If AutoFile isn't registered or was already removed, does nothing. +func (w *SighupWatcher) removeAutoFile(af *AutoFile) { + w.mtx.Lock() + delete(w.autoFiles, af.ID) + w.mtx.Unlock() +} + +func (w *SighupWatcher) closeAll() { + w.mtx.Lock() + for _, af := range w.autoFiles { + af.closeFile() + } + w.mtx.Unlock() +} From c26b857900009ac81c78c1bc03f85e0c8e47818a Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 26 Oct 2016 21:50:07 -0700 Subject: [PATCH 03/17] Fix Search and add test --- group.go | 181 +++++++++++++++++++++++++++++++++---------------- group_test.go | 183 ++++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 284 insertions(+), 80 deletions(-) diff --git a/group.go b/group.go index c0d199e1..84aa8a22 100644 --- a/group.go +++ b/group.go @@ -24,6 +24,7 @@ with respect to the Group's Head (the AutoFile being appended to) */ const groupCheckDuration = 1000 * time.Millisecond +const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB type Group struct { ID string @@ -33,17 +34,25 @@ type Group struct { mtx sync.Mutex headSizeLimit int64 totalSizeLimit int64 + minIndex int // Includes head + maxIndex int // Includes head, where Head will move to } func OpenGroup(head *AutoFile) (g *Group, err error) { dir := path.Dir(head.Path) g = &Group{ - ID: "group:" + head.ID, - Head: head, - Dir: dir, - ticker: time.NewTicker(groupCheckDuration), + ID: "group:" + head.ID, + Head: head, + Dir: dir, + ticker: time.NewTicker(groupCheckDuration), + headSizeLimit: defaultHeadSizeLimit, + minIndex: 0, + maxIndex: 0, } + gInfo := g.readGroupInfo() + g.minIndex = gInfo.MinIndex + g.maxIndex = gInfo.MaxIndex go g.processTicks() return } @@ -72,6 +81,12 @@ func (g *Group) TotalSizeLimit() int64 { return g.totalSizeLimit } +func (g *Group) MaxIndex() int { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.maxIndex +} + func (g *Group) Close() error { g.ticker.Stop() return nil @@ -83,8 +98,8 @@ func (g *Group) processTicks() { if !ok { return // Done. } - // TODO Check head size limit - // TODO check total size limit + g.checkHeadSizeLimit() + g.checkTotalSizeLimit() } } @@ -112,8 +127,7 @@ func (g *Group) RotateFile() { g.mtx.Lock() defer g.mtx.Unlock() - gInfo := g.readGroupInfo() - dstPath := filePathForIndex(g.Head.Path, gInfo.MaxIndex+1) + dstPath := filePathForIndex(g.Head.Path, g.maxIndex) err := os.Rename(g.Head.Path, dstPath) if err != nil { panic(err) @@ -122,6 +136,7 @@ func (g *Group) RotateFile() { if err != nil { panic(err) } + g.maxIndex += 1 } func (g *Group) NewReader(index int) *GroupReader { @@ -135,23 +150,29 @@ type SearchFunc func(line string) (int, error) // Searches for the right file in Group, // then returns a GroupReader to start streaming lines +// Returns true if an exact match was found, otherwise returns +// the next greater line that starts with prefix. // CONTRACT: caller is responsible for closing GroupReader. -func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, error) { - gInfo := g.ReadGroupInfo() - minIndex, maxIndex := gInfo.MinIndex, gInfo.MaxIndex - curIndex := (minIndex + maxIndex + 1) / 2 +func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) { + g.mtx.Lock() + minIndex, maxIndex := g.minIndex, g.maxIndex + g.mtx.Unlock() + // Now minIndex/maxIndex may change meanwhile, + // but it shouldn't be a big deal + // (maybe we'll want to limit scanUntil though) for { + curIndex := (minIndex + maxIndex + 1) / 2 // Base case, when there's only 1 choice left. if minIndex == maxIndex { r := g.NewReader(maxIndex) - err := scanUntil(r, prefix, cmp) + match, err := scanUntil(r, prefix, cmp) if err != nil { r.Close() - return nil, err + return nil, false, err } else { - return r, err + return r, match, err } } @@ -161,13 +182,13 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, error) { foundIndex, line, err := scanFirst(r, prefix) r.Close() if err != nil { - return nil, err + return nil, false, err } // Compare this line to our search query. val, err := cmp(line) if err != nil { - return nil, err + return nil, false, err } if val < 0 { // Line will come later @@ -175,12 +196,15 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, error) { } else if val == 0 { // Stroke of luck, found the line r := g.NewReader(foundIndex) - err := scanUntil(r, prefix, cmp) + match, err := scanUntil(r, prefix, cmp) + if !match { + panic("Expected match to be true") + } if err != nil { r.Close() - return nil, err + return nil, false, err } else { - return r, err + return r, true, err } } else { // We passed it @@ -205,24 +229,28 @@ func scanFirst(r *GroupReader, prefix string) (int, string, error) { } } -func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) error { +// Returns true iff an exact match was found. +func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) { for { line, err := r.ReadLine() if err != nil { - return err + return false, err } if !strings.HasPrefix(line, prefix) { continue } val, err := cmp(line) if err != nil { - return err + return false, err } if val < 0 { continue + } else if val == 0 { + r.PushLine(line) + return true, nil } else { r.PushLine(line) - return nil + return false, nil } } } @@ -241,6 +269,7 @@ func (g *Group) ReadGroupInfo() GroupInfo { return g.readGroupInfo() } +// Index includes the head. // CONTRACT: caller should have called g.mtx.Lock func (g *Group) readGroupInfo() GroupInfo { groupDir := filepath.Dir(g.Head.Path) @@ -285,6 +314,15 @@ func (g *Group) readGroupInfo() GroupInfo { } } + // Now account for the head. + if minIndex == -1 { + // If there were no numbered files, + // then the head is index 0. + minIndex, maxIndex = 0, 0 + } else { + // Otherwise, the head file is 1 greater + maxIndex += 1 + } return GroupInfo{minIndex, maxIndex, totalSize, headSize} } @@ -306,27 +344,43 @@ type GroupReader struct { func newGroupReader(g *Group) *GroupReader { return &GroupReader{ Group: g, - curIndex: -1, + curIndex: 0, curFile: nil, curReader: nil, curLine: nil, } } -func (g *GroupReader) ReadLine() (string, error) { - g.mtx.Lock() - defer g.mtx.Unlock() +func (gr *GroupReader) Close() error { + gr.mtx.Lock() + defer gr.mtx.Unlock() + + if gr.curReader != nil { + err := gr.curFile.Close() + gr.curIndex = 0 + gr.curReader = nil + gr.curFile = nil + gr.curLine = nil + return err + } else { + return nil + } +} + +func (gr *GroupReader) ReadLine() (string, error) { + gr.mtx.Lock() + defer gr.mtx.Unlock() // From PushLine - if g.curLine != nil { - line := string(g.curLine) - g.curLine = nil + if gr.curLine != nil { + line := string(gr.curLine) + gr.curLine = nil return line, nil } // Open file if not open yet - if g.curReader == nil { - err := g.openFile(0) + if gr.curReader == nil { + err := gr.openFile(gr.curIndex) if err != nil { return "", err } @@ -334,63 +388,74 @@ func (g *GroupReader) ReadLine() (string, error) { // Iterate over files until line is found for { - bytes, err := g.curReader.ReadBytes('\n') + bytes, err := gr.curReader.ReadBytes('\n') if err != nil { if err != io.EOF { return string(bytes), err } else { // Open the next file - err := g.openFile(g.curIndex + 1) + err := gr.openFile(gr.curIndex + 1) if err != nil { return "", err } + continue } } + return string(bytes), nil } } -// CONTRACT: caller should hold g.mtx -func (g *GroupReader) openFile(index int) error { +// IF index > gr.Group.maxIndex, returns io.EOF +// CONTRACT: caller should hold gr.mtx +func (gr *GroupReader) openFile(index int) error { // Lock on Group to ensure that head doesn't move in the meanwhile. - g.Group.mtx.Lock() - defer g.Group.mtx.Unlock() + gr.Group.mtx.Lock() + defer gr.Group.mtx.Unlock() + + var curFilePath string + if index == gr.Group.maxIndex { + curFilePath = gr.Head.Path + } else if index > gr.Group.maxIndex { + return io.EOF + } else { + curFilePath = filePathForIndex(gr.Head.Path, index) + } - curFilePath := filePathForIndex(g.Head.Path, index) curFile, err := os.Open(curFilePath) if err != nil { return err } curReader := bufio.NewReader(curFile) - // Update g.cur* - g.curIndex = index - g.curFile = curFile - g.curReader = curReader - g.curLine = nil + // Update gr.cur* + gr.curIndex = index + gr.curFile = curFile + gr.curReader = curReader + gr.curLine = nil return nil } -func (g *GroupReader) PushLine(line string) { - g.mtx.Lock() - defer g.mtx.Unlock() +func (gr *GroupReader) PushLine(line string) { + gr.mtx.Lock() + defer gr.mtx.Unlock() - if g.curLine == nil { - g.curLine = []byte(line) + if gr.curLine == nil { + gr.curLine = []byte(line) } else { panic("PushLine failed, already have line") } } // Cursor's file index. -func (g *GroupReader) CurIndex() int { - g.mtx.Lock() - defer g.mtx.Unlock() - return g.curIndex +func (gr *GroupReader) CurIndex() int { + gr.mtx.Lock() + defer gr.mtx.Unlock() + return gr.curIndex } -func (g *GroupReader) SetIndex(index int) { - g.mtx.Lock() - defer g.mtx.Unlock() - g.openFile(index) +func (gr *GroupReader) SetIndex(index int) { + gr.mtx.Lock() + defer gr.mtx.Unlock() + gr.openFile(index) } diff --git a/group_test.go b/group_test.go index 8c1b7b6a..ced88c8b 100644 --- a/group_test.go +++ b/group_test.go @@ -1,12 +1,25 @@ package autofile import ( + "errors" + "io" + "os" + "strconv" + "strings" "testing" . "github.com/tendermint/go-common" ) -func createTestGroup(t *testing.T, headPath string) *Group { +// NOTE: Returned group has ticker stopped +func createTestGroup(t *testing.T, headSizeLimit int64) *Group { + testID := RandStr(12) + testDir := "_test_" + testID + err := EnsureDir(testDir, 0700) + if err != nil { + t.Fatal("Error creating dir", err) + } + headPath := testDir + "/myfile" autofile, err := OpenAutoFile(headPath) if err != nil { t.Fatal("Error opening AutoFile", headPath, err) @@ -15,9 +28,18 @@ func createTestGroup(t *testing.T, headPath string) *Group { if err != nil { t.Fatal("Error opening Group", err) } + g.SetHeadSizeLimit(headSizeLimit) + g.stopTicker() return g } +func destroyTestGroup(t *testing.T, g *Group) { + err := os.RemoveAll(g.Dir) + if err != nil { + t.Fatal("Error removing test Group directory", err) + } +} + func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, totalSize, headSize int64) { if gInfo.MinIndex != minIndex { t.Errorf("GroupInfo MinIndex expected %v, got %v", minIndex, gInfo.MinIndex) @@ -33,23 +55,14 @@ func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, tota } } -func TestCreateGroup(t *testing.T) { - testID := RandStr(12) - testDir := "_test_" + testID - err := EnsureDir(testDir, 0700) - if err != nil { - t.Fatal("Error creating dir", err) - } - - g := createTestGroup(t, testDir+"/myfile") +func TestCheckHeadSizeLimit(t *testing.T) { + g := createTestGroup(t, 1000*1000) if g == nil { t.Error("Failed to create Group") } - g.SetHeadSizeLimit(1000 * 1000) - g.stopTicker() // At first, there are no files. - assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 0, 0) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0) // Write 1000 bytes 999 times. for i := 0; i < 999; i++ { @@ -58,21 +71,21 @@ func TestCreateGroup(t *testing.T) { t.Fatal("Error appending to head", err) } } - assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 999000, 999000) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) // Even calling checkHeadSizeLimit manually won't rotate it. g.checkHeadSizeLimit() - assertGroupInfo(t, g.ReadGroupInfo(), -1, -1, 999000, 999000) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) // Write 1000 more bytes. - _, err = g.Head.Write([]byte(RandStr(999) + "\n")) + _, err := g.Head.Write([]byte(RandStr(999) + "\n")) if err != nil { t.Fatal("Error appending to head", err) } // Calling checkHeadSizeLimit this time rolls it. g.checkHeadSizeLimit() - assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 1000000, 0) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1000000, 0) // Write 1000 more bytes. _, err = g.Head.Write([]byte(RandStr(999) + "\n")) @@ -82,7 +95,7 @@ func TestCreateGroup(t *testing.T) { // Calling checkHeadSizeLimit does nothing. g.checkHeadSizeLimit() - assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 1001000, 1000) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1001000, 1000) // Write 1000 bytes 999 times. for i := 0; i < 999; i++ { @@ -91,20 +104,146 @@ func TestCreateGroup(t *testing.T) { t.Fatal("Error appending to head", err) } } - assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 2000000, 1000000) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 1000000) // Calling checkHeadSizeLimit rolls it again. g.checkHeadSizeLimit() - assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 0) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2000000, 0) // Write 1000 more bytes. _, err = g.Head.Write([]byte(RandStr(999) + "\n")) if err != nil { t.Fatal("Error appending to head", err) } - assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2001000, 1000) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000) // Calling checkHeadSizeLimit does nothing. g.checkHeadSizeLimit() - assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2001000, 1000) + assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000) + + // Cleanup + destroyTestGroup(t, g) +} + +func TestSearch(t *testing.T) { + g := createTestGroup(t, 10*1000) + if g == nil { + t.Error("Failed to create Group") + } + + // Create some files in the group that have several INFO lines in them. + // Try to put the INFO lines in various spots. + for i := 0; i < 100; i++ { + // The random junk at the end ensures that this INFO linen + // is equally likely to show up at the end. + _, err := g.Head.Write([]byte(Fmt("INFO %v %v\n", i, RandStr(123)))) + if err != nil { + t.Error("Failed to write to head") + } + g.checkHeadSizeLimit() + for j := 0; j < 10; j++ { + _, err := g.Head.Write([]byte(RandStr(123) + "\n")) + if err != nil { + t.Error("Failed to write to head") + } + g.checkHeadSizeLimit() + } + } + + // Create a search func that searches for line + makeSearchFunc := func(target int) SearchFunc { + return func(line string) (int, error) { + parts := strings.Split(line, " ") + if len(parts) != 3 { + return -1, errors.New("Line did not have 3 parts") + } + i, err := strconv.Atoi(parts[1]) + if err != nil { + return -1, errors.New("Failed to parse INFO: " + err.Error()) + } + if target < i { + return 1, nil + } else if target == i { + return 0, nil + } else { + return -1, nil + } + } + } + + // Now search for each number + for i := 0; i < 100; i++ { + t.Log("Testing for i", i) + gr, match, err := g.Search("INFO", makeSearchFunc(i)) + if err != nil { + t.Fatal("Failed to search for line:", err) + } + if !match { + t.Error("Expected Search to return exact match") + } + line, err := gr.ReadLine() + if err != nil { + t.Fatal("Failed to read line after search", err) + } + if !strings.HasPrefix(line, Fmt("INFO %v ", i)) { + t.Fatal("Failed to get correct line") + } + // Make sure we can continue to read from there. + cur := i + 1 + for { + line, err := gr.ReadLine() + if err == io.EOF { + if cur == 99+1 { + // OK! + break + } else { + t.Fatal("Got EOF after the wrong INFO #") + } + } else if err != nil { + t.Fatal("Error reading line", err) + } + if !strings.HasPrefix(line, "INFO ") { + continue + } + if !strings.HasPrefix(line, Fmt("INFO %v ", cur)) { + t.Fatalf("Unexpected INFO #. Expected %v got:\n%v", cur, line) + } + cur += 1 + } + gr.Close() + } + + // Now search for something that is too small. + // We should get the first available line. + { + gr, match, err := g.Search("INFO", makeSearchFunc(-999)) + if err != nil { + t.Fatal("Failed to search for line:", err) + } + if match { + t.Error("Expected Search to not return exact match") + } + line, err := gr.ReadLine() + if err != nil { + t.Fatal("Failed to read line after search", err) + } + if !strings.HasPrefix(line, "INFO 0 ") { + t.Error("Failed to fetch correct line, which is the earliest INFO") + } + } + + // Now search for something that is too large. + // We should get an EOF error. + { + gr, _, err := g.Search("INFO", makeSearchFunc(999)) + if err != io.EOF { + t.Error("Expected to get an EOF error") + } + if gr != nil { + t.Error("Expected to get nil GroupReader") + } + } + + // Cleanup + destroyTestGroup(t, g) } From d741b81ab5634483d3abc45615cd10b1befc99bc Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Wed, 26 Oct 2016 22:11:43 -0700 Subject: [PATCH 04/17] Add better docs for Group --- group.go | 24 ++++++++++++++++++++++-- group_test.go | 4 ++++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/group.go b/group.go index 84aa8a22..04403bb7 100644 --- a/group.go +++ b/group.go @@ -19,8 +19,28 @@ You can open a Group to keep restrictions on an AutoFile, like the maximum size of each chunk, and/or the total amount of bytes stored in the group. -The Group can also be used to binary-search, and to read atomically -with respect to the Group's Head (the AutoFile being appended to) +The first file to be written in the Group.Dir is the head file. + + Dir/ + - + +Once the Head file reaches the size limit, it will be rotated. + + Dir/ + - .000 // First rolled file + - // New head path, starts empty. + // The implicit index is 001. + +As more files are written, the index numbers grow... + + Dir/ + - .000 // First rolled file + - .001 // Second rolled file + - ... + - // New head path + +The Group can also be used to binary-search for some line, +assuming that marker lines are written occasionally. */ const groupCheckDuration = 1000 * time.Millisecond diff --git a/group_test.go b/group_test.go index ced88c8b..f7c70b70 100644 --- a/group_test.go +++ b/group_test.go @@ -230,6 +230,10 @@ func TestSearch(t *testing.T) { if !strings.HasPrefix(line, "INFO 0 ") { t.Error("Failed to fetch correct line, which is the earliest INFO") } + err = gr.Close() + if err != nil { + t.Error("Failed to close GroupReader", err) + } } // Now search for something that is too large. From 03110423360a1ad2440f510102577d3500404006 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 28 Oct 2016 09:10:33 -0700 Subject: [PATCH 05/17] Add CHALLENGE --- group.go | 1 + 1 file changed, 1 insertion(+) diff --git a/group.go b/group.go index 04403bb7..f382ea0b 100644 --- a/group.go +++ b/group.go @@ -141,6 +141,7 @@ func (g *Group) checkHeadSizeLimit() { func (g *Group) checkTotalSizeLimit() { // TODO enforce total size limit + // CHALLENGE } func (g *Group) RotateFile() { From 5e9c5dc413eb3d4455567e1c84c5324510fd9c6e Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 28 Oct 2016 13:56:31 -0700 Subject: [PATCH 06/17] Add Group.WriteLine --- group.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/group.go b/group.go index f382ea0b..48c770ca 100644 --- a/group.go +++ b/group.go @@ -107,6 +107,14 @@ func (g *Group) MaxIndex() int { return g.maxIndex } +// Auto appends "\n" +// TODO: Make it halt if space is unavailable +func (g *Group) WriteLine(line string) error { + _, err := g.Head.Write([]byte(line + "\n")) + return err +} + +// NOTE: g.Head must be closed separately func (g *Group) Close() error { g.ticker.Stop() return nil From 916f3d789b6afaf7bfe161aeec391c8a35e354a8 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Fri, 28 Oct 2016 14:50:46 -0700 Subject: [PATCH 07/17] Size() returns 0 if file doesn't exist --- autofile.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/autofile.go b/autofile.go index ed9d549b..36de9984 100644 --- a/autofile.go +++ b/autofile.go @@ -107,6 +107,16 @@ func (af *AutoFile) openFile() error { func (af *AutoFile) Size() (int64, error) { af.mtx.Lock() defer af.mtx.Unlock() + if af.file == nil { + err := af.openFile() + if err != nil { + if err == os.ErrNotExist { + return 0, nil + } else { + return -1, err + } + } + } stat, err := af.file.Stat() if err != nil { return -1, err From 1261fca1608264cd14635585b6948ab359c88e37 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 30 Oct 2016 02:40:39 -0700 Subject: [PATCH 08/17] FindLast --- group.go | 96 ++++++++++++++++++++++++++---- group_test.go | 161 ++++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 235 insertions(+), 22 deletions(-) diff --git a/group.go b/group.go index 48c770ca..331b7e9e 100644 --- a/group.go +++ b/group.go @@ -168,10 +168,16 @@ func (g *Group) RotateFile() { g.maxIndex += 1 } -func (g *Group) NewReader(index int) *GroupReader { +// NOTE: if error, returns no GroupReader. +// CONTRACT: Caller must close the returned GroupReader +func (g *Group) NewReader(index int) (*GroupReader, error) { r := newGroupReader(g) - r.SetIndex(index) - return r + err := r.SetIndex(index) + if err != nil { + return nil, err + } else { + return r, nil + } } // Returns -1 if line comes after, 0 if found, 1 if line comes before. @@ -181,7 +187,7 @@ type SearchFunc func(line string) (int, error) // then returns a GroupReader to start streaming lines // Returns true if an exact match was found, otherwise returns // the next greater line that starts with prefix. -// CONTRACT: caller is responsible for closing GroupReader. +// CONTRACT: Caller must close the returned GroupReader func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) { g.mtx.Lock() minIndex, maxIndex := g.minIndex, g.maxIndex @@ -195,7 +201,10 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error // Base case, when there's only 1 choice left. if minIndex == maxIndex { - r := g.NewReader(maxIndex) + r, err := g.NewReader(maxIndex) + if err != nil { + return nil, false, err + } match, err := scanUntil(r, prefix, cmp) if err != nil { r.Close() @@ -207,8 +216,11 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error // Read starting roughly at the middle file, // until we find line that has prefix. - r := g.NewReader(curIndex) - foundIndex, line, err := scanFirst(r, prefix) + r, err := g.NewReader(curIndex) + if err != nil { + return nil, false, err + } + foundIndex, line, err := scanNext(r, prefix) r.Close() if err != nil { return nil, false, err @@ -224,7 +236,10 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error minIndex = foundIndex } else if val == 0 { // Stroke of luck, found the line - r := g.NewReader(foundIndex) + r, err := g.NewReader(foundIndex) + if err != nil { + return nil, false, err + } match, err := scanUntil(r, prefix, cmp) if !match { panic("Expected match to be true") @@ -244,7 +259,8 @@ func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error } // Scans and returns the first line that starts with 'prefix' -func scanFirst(r *GroupReader, prefix string) (int, string, error) { +// Consumes line and returns it. +func scanNext(r *GroupReader, prefix string) (int, string, error) { for { line, err := r.ReadLine() if err != nil { @@ -259,6 +275,7 @@ func scanFirst(r *GroupReader, prefix string) (int, string, error) { } // Returns true iff an exact match was found. +// Pushes line, does not consume it. func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) { for { line, err := r.ReadLine() @@ -284,6 +301,47 @@ func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) { } } +// Searches for the last line in Group with prefix. +func (g *Group) FindLast(prefix string) (match string, found bool, err error) { + g.mtx.Lock() + minIndex, maxIndex := g.minIndex, g.maxIndex + g.mtx.Unlock() + + r, err := g.NewReader(maxIndex) + if err != nil { + return "", false, err + } + defer r.Close() + + // Open files from the back and read +GROUP_LOOP: + for i := maxIndex; i >= minIndex; i-- { + err := r.SetIndex(i) + if err != nil { + return "", false, err + } + // Scan each line and test whether line matches + for { + line, err := r.ReadLineInCurrent() + if err == io.EOF { + if found { + return match, found, nil + } else { + continue GROUP_LOOP + } + } else if err != nil { + return "", false, err + } + if strings.HasPrefix(line, prefix) { + match = line + found = true + } + } + } + + return +} + type GroupInfo struct { MinIndex int MaxIndex int @@ -399,6 +457,18 @@ func (gr *GroupReader) Close() error { func (gr *GroupReader) ReadLine() (string, error) { gr.mtx.Lock() defer gr.mtx.Unlock() + return gr.readLineWithOptions(false) +} + +func (gr *GroupReader) ReadLineInCurrent() (string, error) { + gr.mtx.Lock() + defer gr.mtx.Unlock() + return gr.readLineWithOptions(true) +} + +// curFileOnly: if True, do not open new files, +// just return io.EOF if no new lines found. +func (gr *GroupReader) readLineWithOptions(curFileOnly bool) (string, error) { // From PushLine if gr.curLine != nil { @@ -420,7 +490,9 @@ func (gr *GroupReader) ReadLine() (string, error) { bytes, err := gr.curReader.ReadBytes('\n') if err != nil { if err != io.EOF { - return string(bytes), err + return "", err + } else if curFileOnly { + return "", err } else { // Open the next file err := gr.openFile(gr.curIndex + 1) @@ -483,8 +555,8 @@ func (gr *GroupReader) CurIndex() int { return gr.curIndex } -func (gr *GroupReader) SetIndex(index int) { +func (gr *GroupReader) SetIndex(index int) error { gr.mtx.Lock() defer gr.mtx.Unlock() - gr.openFile(index) + return gr.openFile(index) } diff --git a/group_test.go b/group_test.go index f7c70b70..672bd4d9 100644 --- a/group_test.go +++ b/group_test.go @@ -3,6 +3,7 @@ package autofile import ( "errors" "io" + "io/ioutil" "os" "strconv" "strings" @@ -30,6 +31,10 @@ func createTestGroup(t *testing.T, headSizeLimit int64) *Group { } g.SetHeadSizeLimit(headSizeLimit) g.stopTicker() + + if g == nil { + t.Fatal("Failed to create Group") + } return g } @@ -57,16 +62,13 @@ func assertGroupInfo(t *testing.T, gInfo GroupInfo, minIndex, maxIndex int, tota func TestCheckHeadSizeLimit(t *testing.T) { g := createTestGroup(t, 1000*1000) - if g == nil { - t.Error("Failed to create Group") - } // At first, there are no files. assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 0, 0) // Write 1000 bytes 999 times. for i := 0; i < 999; i++ { - _, err := g.Head.Write([]byte(RandStr(999) + "\n")) + err := g.WriteLine(RandStr(999)) if err != nil { t.Fatal("Error appending to head", err) } @@ -78,7 +80,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) // Write 1000 more bytes. - _, err := g.Head.Write([]byte(RandStr(999) + "\n")) + err := g.WriteLine(RandStr(999)) if err != nil { t.Fatal("Error appending to head", err) } @@ -88,7 +90,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 1000000, 0) // Write 1000 more bytes. - _, err = g.Head.Write([]byte(RandStr(999) + "\n")) + err = g.WriteLine(RandStr(999)) if err != nil { t.Fatal("Error appending to head", err) } @@ -99,7 +101,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { // Write 1000 bytes 999 times. for i := 0; i < 999; i++ { - _, err := g.Head.Write([]byte(RandStr(999) + "\n")) + err := g.WriteLine(RandStr(999)) if err != nil { t.Fatal("Error appending to head", err) } @@ -127,9 +129,6 @@ func TestCheckHeadSizeLimit(t *testing.T) { func TestSearch(t *testing.T) { g := createTestGroup(t, 10*1000) - if g == nil { - t.Error("Failed to create Group") - } // Create some files in the group that have several INFO lines in them. // Try to put the INFO lines in various spots. @@ -251,3 +250,145 @@ func TestSearch(t *testing.T) { // Cleanup destroyTestGroup(t, g) } + +func TestRotateFile(t *testing.T) { + g := createTestGroup(t, 0) + g.WriteLine("Line 1") + g.WriteLine("Line 2") + g.WriteLine("Line 3") + g.RotateFile() + g.WriteLine("Line 4") + g.WriteLine("Line 5") + g.WriteLine("Line 6") + + // Read g.Head.Path+"000" + body1, err := ioutil.ReadFile(g.Head.Path + ".000") + if err != nil { + t.Error("Failed to read first rolled file") + } + if string(body1) != "Line 1\nLine 2\nLine 3\n" { + t.Errorf("Got unexpected contents: [%v]", string(body1)) + } + + // Read g.Head.Path + body2, err := ioutil.ReadFile(g.Head.Path) + if err != nil { + t.Error("Failed to read first rolled file") + } + if string(body2) != "Line 4\nLine 5\nLine 6\n" { + t.Errorf("Got unexpected contents: [%v]", string(body2)) + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestFindLast1(t *testing.T) { + g := createTestGroup(t, 0) + + g.WriteLine("Line 1") + g.WriteLine("Line 2") + g.WriteLine("# a") + g.WriteLine("Line 3") + g.RotateFile() + g.WriteLine("Line 4") + g.WriteLine("Line 5") + g.WriteLine("Line 6") + g.WriteLine("# b") + + match, found, err := g.FindLast("#") + if err != nil { + t.Error("Unexpected error", err) + } + if !found { + t.Error("Expected found=True") + } + if match != "# b\n" { + t.Errorf("Unexpected match: [%v]", match) + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestFindLast2(t *testing.T) { + g := createTestGroup(t, 0) + + g.WriteLine("Line 1") + g.WriteLine("Line 2") + g.WriteLine("Line 3") + g.RotateFile() + g.WriteLine("# a") + g.WriteLine("Line 4") + g.WriteLine("Line 5") + g.WriteLine("# b") + g.WriteLine("Line 6") + + match, found, err := g.FindLast("#") + if err != nil { + t.Error("Unexpected error", err) + } + if !found { + t.Error("Expected found=True") + } + if match != "# b\n" { + t.Errorf("Unexpected match: [%v]", match) + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestFindLast3(t *testing.T) { + g := createTestGroup(t, 0) + + g.WriteLine("Line 1") + g.WriteLine("# a") + g.WriteLine("Line 2") + g.WriteLine("# b") + g.WriteLine("Line 3") + g.RotateFile() + g.WriteLine("Line 4") + g.WriteLine("Line 5") + g.WriteLine("Line 6") + + match, found, err := g.FindLast("#") + if err != nil { + t.Error("Unexpected error", err) + } + if !found { + t.Error("Expected found=True") + } + if match != "# b\n" { + t.Errorf("Unexpected match: [%v]", match) + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestFindLast4(t *testing.T) { + g := createTestGroup(t, 0) + + g.WriteLine("Line 1") + g.WriteLine("Line 2") + g.WriteLine("Line 3") + g.RotateFile() + g.WriteLine("Line 4") + g.WriteLine("Line 5") + g.WriteLine("Line 6") + + match, found, err := g.FindLast("#") + if err != nil { + t.Error("Unexpected error", err) + } + if found { + t.Error("Expected found=False") + } + if match != "" { + t.Errorf("Unexpected match: [%v]", match) + } + + // Cleanup + destroyTestGroup(t, g) +} From dc8fa06e642c53339987acfd90154c81c1ab4c6d Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sat, 5 Nov 2016 17:58:50 -0700 Subject: [PATCH 09/17] Add MakeSimpleSearchFunc --- autofile.go | 4 ++++ group.go | 46 +++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/autofile.go b/autofile.go index 36de9984..93ae8ec8 100644 --- a/autofile.go +++ b/autofile.go @@ -95,6 +95,10 @@ func (af *AutoFile) Write(b []byte) (n int, err error) { return af.file.Write(b) } +func (af *AutoFile) Sync() error { + return af.file.Sync() +} + func (af *AutoFile) openFile() error { file, err := os.OpenFile(af.Path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) if err != nil { diff --git a/group.go b/group.go index 331b7e9e..0788b163 100644 --- a/group.go +++ b/group.go @@ -2,6 +2,7 @@ package autofile import ( "bufio" + "errors" "fmt" "io" "os" @@ -12,6 +13,8 @@ import ( "strings" "sync" "time" + + . "github.com/tendermint/go-common" ) /* @@ -56,6 +59,9 @@ type Group struct { totalSizeLimit int64 minIndex int // Includes head maxIndex int // Includes head, where Head will move to + + // TODO: When we start deleting files, we need to start tracking GroupReaders + // and their dependencies. } func OpenGroup(head *AutoFile) (g *Group, err error) { @@ -183,10 +189,10 @@ func (g *Group) NewReader(index int) (*GroupReader, error) { // Returns -1 if line comes after, 0 if found, 1 if line comes before. type SearchFunc func(line string) (int, error) -// Searches for the right file in Group, -// then returns a GroupReader to start streaming lines -// Returns true if an exact match was found, otherwise returns -// the next greater line that starts with prefix. +// Searches for the right file in Group, then returns a GroupReader to start +// streaming lines. +// Returns true if an exact match was found, otherwise returns the next greater +// line that starts with prefix. // CONTRACT: Caller must close the returned GroupReader func (g *Group) Search(prefix string, cmp SearchFunc) (*GroupReader, bool, error) { g.mtx.Lock() @@ -301,7 +307,8 @@ func scanUntil(r *GroupReader, prefix string, cmp SearchFunc) (bool, error) { } } -// Searches for the last line in Group with prefix. +// Searches backwards for the last line in Group with prefix. +// Scans each file forward until the end to find the last match. func (g *Group) FindLast(prefix string) (match string, found bool, err error) { g.mtx.Lock() minIndex, maxIndex := g.minIndex, g.maxIndex @@ -560,3 +567,32 @@ func (gr *GroupReader) SetIndex(index int) error { defer gr.mtx.Unlock() return gr.openFile(index) } + +//-------------------------------------------------------------------------------- + +// A simple SearchFunc that assumes that the marker is of form +// . +// For example, if prefix is '#HEIGHT:', the markers of expected to be of the form: +// +// #HEIGHT:1 +// ... +// #HEIGHT:2 +// ... +func MakeSimpleSearchFunc(prefix string, target int) SearchFunc { + return func(line string) (int, error) { + if !strings.HasPrefix(line, prefix) { + return -1, errors.New(Fmt("Marker line did not have prefix: %v", prefix)) + } + i, err := strconv.Atoi(line[len(prefix):]) + if err != nil { + return -1, errors.New(Fmt("Failed to parse marker line: %v", err.Error())) + } + if target < i { + return 1, nil + } else if target == i { + return 0, nil + } else { + return -1, nil + } + } +} From d1848762cf184eb76d50664ba6b568080c067137 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sun, 20 Nov 2016 17:19:15 -0800 Subject: [PATCH 10/17] Fix issue where buffered writes may split a line to two files --- autofile.go | 8 ++++++-- group.go | 55 +++++++++++++++++++++++++++-------------------------- 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/autofile.go b/autofile.go index 93ae8ec8..60a314a4 100644 --- a/autofile.go +++ b/autofile.go @@ -1,10 +1,11 @@ package autofile import ( - . "github.com/tendermint/go-common" "os" "sync" "time" + + . "github.com/tendermint/go-common" ) /* AutoFile usage @@ -87,12 +88,15 @@ func (af *AutoFile) closeFile() (err error) { func (af *AutoFile) Write(b []byte) (n int, err error) { af.mtx.Lock() defer af.mtx.Unlock() + if af.file == nil { if err = af.openFile(); err != nil { return } } - return af.file.Write(b) + + n, err = af.file.Write(b) + return } func (af *AutoFile) Sync() error { diff --git a/group.go b/group.go index 0788b163..1c984249 100644 --- a/group.go +++ b/group.go @@ -3,7 +3,6 @@ package autofile import ( "bufio" "errors" - "fmt" "io" "os" "path" @@ -52,7 +51,8 @@ const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB type Group struct { ID string Head *AutoFile // The head AutoFile to write to - Dir string // Directory that contains .Head + headBuf *bufio.Writer + Dir string // Directory that contains .Head ticker *time.Ticker mtx sync.Mutex headSizeLimit int64 @@ -70,6 +70,7 @@ func OpenGroup(head *AutoFile) (g *Group, err error) { g = &Group{ ID: "group:" + head.ID, Head: head, + headBuf: bufio.NewWriterSize(head, 4096*10), Dir: dir, ticker: time.NewTicker(groupCheckDuration), headSizeLimit: defaultHeadSizeLimit, @@ -114,9 +115,10 @@ func (g *Group) MaxIndex() int { } // Auto appends "\n" +// NOTE: Writes are buffered so they don't write synchronously // TODO: Make it halt if space is unavailable func (g *Group) WriteLine(line string) error { - _, err := g.Head.Write([]byte(line + "\n")) + _, err := g.headBuf.Write([]byte(line + "\n")) return err } @@ -329,7 +331,7 @@ GROUP_LOOP: } // Scan each line and test whether line matches for { - line, err := r.ReadLineInCurrent() + line, err := r.ReadLine() if err == io.EOF { if found { return match, found, nil @@ -343,6 +345,13 @@ GROUP_LOOP: match = line found = true } + if r.CurIndex() > i { + if found { + return match, found, nil + } else { + continue GROUP_LOOP + } + } } } @@ -461,21 +470,11 @@ func (gr *GroupReader) Close() error { } } +// Reads a line (without delimiter) +// just return io.EOF if no new lines found. func (gr *GroupReader) ReadLine() (string, error) { gr.mtx.Lock() defer gr.mtx.Unlock() - return gr.readLineWithOptions(false) -} - -func (gr *GroupReader) ReadLineInCurrent() (string, error) { - gr.mtx.Lock() - defer gr.mtx.Unlock() - return gr.readLineWithOptions(true) -} - -// curFileOnly: if True, do not open new files, -// just return io.EOF if no new lines found. -func (gr *GroupReader) readLineWithOptions(curFileOnly bool) (string, error) { // From PushLine if gr.curLine != nil { @@ -493,23 +492,25 @@ func (gr *GroupReader) readLineWithOptions(curFileOnly bool) (string, error) { } // Iterate over files until line is found + var linePrefix string for { - bytes, err := gr.curReader.ReadBytes('\n') - if err != nil { - if err != io.EOF { - return "", err - } else if curFileOnly { + bytesRead, err := gr.curReader.ReadBytes('\n') + if err == io.EOF { + // Open the next file + err := gr.openFile(gr.curIndex + 1) + if err != nil { return "", err + } + if len(bytesRead) > 0 && bytesRead[len(bytesRead)-1] == byte('\n') { + return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil } else { - // Open the next file - err := gr.openFile(gr.curIndex + 1) - if err != nil { - return "", err - } + linePrefix += string(bytesRead) continue } + } else if err != nil { + return "", err } - return string(bytes), nil + return linePrefix + string(bytesRead[:len(bytesRead)-1]), nil } } From a528af55d3c8354f676b4a5f718ab51d9b9fbb9f Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 21 Nov 2016 19:09:14 -0800 Subject: [PATCH 11/17] Group is a BaseService; TotalSizeLimit enforced; tests fixed --- group.go | 117 ++++++++++++++++++++++++++++++++++++++------------ group_test.go | 27 ++++++++---- 2 files changed, 108 insertions(+), 36 deletions(-) diff --git a/group.go b/group.go index 1c984249..a2584dfc 100644 --- a/group.go +++ b/group.go @@ -3,7 +3,9 @@ package autofile import ( "bufio" "errors" + "fmt" "io" + "log" "os" "path" "path/filepath" @@ -45,10 +47,14 @@ The Group can also be used to binary-search for some line, assuming that marker lines are written occasionally. */ -const groupCheckDuration = 1000 * time.Millisecond -const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB +const groupCheckDuration = 5000 * time.Millisecond +const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB +const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB +const maxFilesToRemove = 4 // needs to be greater than 1 type Group struct { + BaseService + ID string Head *AutoFile // The head AutoFile to write to headBuf *bufio.Writer @@ -64,23 +70,43 @@ type Group struct { // and their dependencies. } -func OpenGroup(head *AutoFile) (g *Group, err error) { - dir := path.Dir(head.Path) +func OpenGroup(headPath string) (g *Group, err error) { + + dir := path.Dir(headPath) + head, err := OpenAutoFile(headPath) + if err != nil { + return nil, err + } g = &Group{ - ID: "group:" + head.ID, - Head: head, - headBuf: bufio.NewWriterSize(head, 4096*10), - Dir: dir, - ticker: time.NewTicker(groupCheckDuration), - headSizeLimit: defaultHeadSizeLimit, - minIndex: 0, - maxIndex: 0, + ID: "group:" + head.ID, + Head: head, + headBuf: bufio.NewWriterSize(head, 4096*10), + Dir: dir, + ticker: time.NewTicker(groupCheckDuration), + headSizeLimit: defaultHeadSizeLimit, + totalSizeLimit: defaultTotalSizeLimit, + minIndex: 0, + maxIndex: 0, } + g.BaseService = *NewBaseService(nil, "Group", g) + gInfo := g.readGroupInfo() g.minIndex = gInfo.MinIndex g.maxIndex = gInfo.MaxIndex + return +} + +func (g *Group) OnStart() error { + g.BaseService.OnStart() go g.processTicks() + return nil +} + +// NOTE: g.Head must be closed separately +func (g *Group) OnStop() { + g.BaseService.OnStop() + g.ticker.Stop() return } @@ -118,14 +144,16 @@ func (g *Group) MaxIndex() int { // NOTE: Writes are buffered so they don't write synchronously // TODO: Make it halt if space is unavailable func (g *Group) WriteLine(line string) error { + g.mtx.Lock() + defer g.mtx.Unlock() _, err := g.headBuf.Write([]byte(line + "\n")) return err } -// NOTE: g.Head must be closed separately -func (g *Group) Close() error { - g.ticker.Stop() - return nil +func (g *Group) Flush() error { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.headBuf.Flush() } func (g *Group) processTicks() { @@ -146,25 +174,58 @@ func (g *Group) stopTicker() { // NOTE: this function is called manually in tests. func (g *Group) checkHeadSizeLimit() { + limit := g.HeadSizeLimit() + if limit == 0 { + return + } size, err := g.Head.Size() if err != nil { panic(err) } - if size >= g.HeadSizeLimit() { + if size >= limit { g.RotateFile() } } func (g *Group) checkTotalSizeLimit() { - // TODO enforce total size limit - // CHALLENGE + limit := g.TotalSizeLimit() + if limit == 0 { + return + } + + gInfo := g.readGroupInfo() + totalSize := gInfo.TotalSize + for i := 0; i < maxFilesToRemove; i++ { + fmt.Println(">>", gInfo, totalSize, i) + index := gInfo.MinIndex + i + if totalSize < limit { + return + } + if index == gInfo.MaxIndex { + // Special degenerate case, just do nothing. + log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound") + return + } + pathToRemove := filePathForIndex(g.Head.Path, gInfo.MinIndex, gInfo.MaxIndex) + fileInfo, err := os.Stat(pathToRemove) + if err != nil { + log.Println("WARNING: Failed to fetch info for file @" + pathToRemove) + continue + } + err = os.Remove(pathToRemove) + if err != nil { + log.Println(err) + return + } + totalSize -= fileInfo.Size() + } } func (g *Group) RotateFile() { g.mtx.Lock() defer g.mtx.Unlock() - dstPath := filePathForIndex(g.Head.Path, g.maxIndex) + dstPath := filePathForIndex(g.Head.Path, g.maxIndex, g.maxIndex+1) err := os.Rename(g.Head.Path, dstPath) if err != nil { panic(err) @@ -429,8 +490,12 @@ func (g *Group) readGroupInfo() GroupInfo { return GroupInfo{minIndex, maxIndex, totalSize, headSize} } -func filePathForIndex(headPath string, index int) string { - return fmt.Sprintf("%v.%03d", headPath, index) +func filePathForIndex(headPath string, index int, maxIndex int) string { + if index == maxIndex { + return headPath + } else { + return fmt.Sprintf("%v.%03d", headPath, index) + } } //-------------------------------------------------------------------------------- @@ -522,15 +587,11 @@ func (gr *GroupReader) openFile(index int) error { gr.Group.mtx.Lock() defer gr.Group.mtx.Unlock() - var curFilePath string - if index == gr.Group.maxIndex { - curFilePath = gr.Head.Path - } else if index > gr.Group.maxIndex { + if index > gr.Group.maxIndex { return io.EOF - } else { - curFilePath = filePathForIndex(gr.Head.Path, index) } + curFilePath := filePathForIndex(gr.Head.Path, index, gr.Group.maxIndex) curFile, err := os.Open(curFilePath) if err != nil { return err diff --git a/group_test.go b/group_test.go index 672bd4d9..1c2280e8 100644 --- a/group_test.go +++ b/group_test.go @@ -21,11 +21,7 @@ func createTestGroup(t *testing.T, headSizeLimit int64) *Group { t.Fatal("Error creating dir", err) } headPath := testDir + "/myfile" - autofile, err := OpenAutoFile(headPath) - if err != nil { - t.Fatal("Error opening AutoFile", headPath, err) - } - g, err := OpenGroup(autofile) + g, err := OpenGroup(headPath) if err != nil { t.Fatal("Error opening Group", err) } @@ -73,6 +69,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { t.Fatal("Error appending to head", err) } } + g.Flush() assertGroupInfo(t, g.ReadGroupInfo(), 0, 0, 999000, 999000) // Even calling checkHeadSizeLimit manually won't rotate it. @@ -84,6 +81,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { if err != nil { t.Fatal("Error appending to head", err) } + g.Flush() // Calling checkHeadSizeLimit this time rolls it. g.checkHeadSizeLimit() @@ -94,6 +92,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { if err != nil { t.Fatal("Error appending to head", err) } + g.Flush() // Calling checkHeadSizeLimit does nothing. g.checkHeadSizeLimit() @@ -106,6 +105,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { t.Fatal("Error appending to head", err) } } + g.Flush() assertGroupInfo(t, g.ReadGroupInfo(), 0, 1, 2000000, 1000000) // Calling checkHeadSizeLimit rolls it again. @@ -117,6 +117,7 @@ func TestCheckHeadSizeLimit(t *testing.T) { if err != nil { t.Fatal("Error appending to head", err) } + g.Flush() assertGroupInfo(t, g.ReadGroupInfo(), 0, 2, 2001000, 1000) // Calling checkHeadSizeLimit does nothing. @@ -256,10 +257,12 @@ func TestRotateFile(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") + g.Flush() // Read g.Head.Path+"000" body1, err := ioutil.ReadFile(g.Head.Path + ".000") @@ -290,11 +293,13 @@ func TestFindLast1(t *testing.T) { g.WriteLine("Line 2") g.WriteLine("# a") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") g.WriteLine("# b") + g.Flush() match, found, err := g.FindLast("#") if err != nil { @@ -303,7 +308,7 @@ func TestFindLast1(t *testing.T) { if !found { t.Error("Expected found=True") } - if match != "# b\n" { + if match != "# b" { t.Errorf("Unexpected match: [%v]", match) } @@ -317,12 +322,14 @@ func TestFindLast2(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("# a") g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("# b") g.WriteLine("Line 6") + g.Flush() match, found, err := g.FindLast("#") if err != nil { @@ -331,7 +338,7 @@ func TestFindLast2(t *testing.T) { if !found { t.Error("Expected found=True") } - if match != "# b\n" { + if match != "# b" { t.Errorf("Unexpected match: [%v]", match) } @@ -347,10 +354,12 @@ func TestFindLast3(t *testing.T) { g.WriteLine("Line 2") g.WriteLine("# b") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") + g.Flush() match, found, err := g.FindLast("#") if err != nil { @@ -359,7 +368,7 @@ func TestFindLast3(t *testing.T) { if !found { t.Error("Expected found=True") } - if match != "# b\n" { + if match != "# b" { t.Errorf("Unexpected match: [%v]", match) } @@ -373,10 +382,12 @@ func TestFindLast4(t *testing.T) { g.WriteLine("Line 1") g.WriteLine("Line 2") g.WriteLine("Line 3") + g.Flush() g.RotateFile() g.WriteLine("Line 4") g.WriteLine("Line 5") g.WriteLine("Line 6") + g.Flush() match, found, err := g.FindLast("#") if err != nil { From dd12bd8f1b59b6ee75ae6ce1c1c70a5c2dc32f11 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 21 Nov 2016 19:57:17 -0800 Subject: [PATCH 12/17] Fix checkTotalSizeLimit bug; remove more than 1 file at a time --- group.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/group.go b/group.go index a2584dfc..d3f724ad 100644 --- a/group.go +++ b/group.go @@ -206,7 +206,7 @@ func (g *Group) checkTotalSizeLimit() { log.Println("WARNING: Group's head " + g.Head.Path + "may grow without bound") return } - pathToRemove := filePathForIndex(g.Head.Path, gInfo.MinIndex, gInfo.MaxIndex) + pathToRemove := filePathForIndex(g.Head.Path, index, gInfo.MaxIndex) fileInfo, err := os.Stat(pathToRemove) if err != nil { log.Println("WARNING: Failed to fetch info for file @" + pathToRemove) From 2a306419c88d10fab038f19dcbe4535e740b0aa0 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 21 Nov 2016 20:19:01 -0800 Subject: [PATCH 13/17] Remove spurious fmt --- group.go | 1 - 1 file changed, 1 deletion(-) diff --git a/group.go b/group.go index d3f724ad..4de0d034 100644 --- a/group.go +++ b/group.go @@ -196,7 +196,6 @@ func (g *Group) checkTotalSizeLimit() { gInfo := g.readGroupInfo() totalSize := gInfo.TotalSize for i := 0; i < maxFilesToRemove; i++ { - fmt.Println(">>", gInfo, totalSize, i) index := gInfo.MinIndex + i if totalSize < limit { return From 63186e34b33d78ae47fb0d25e5717b307fdf3603 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Mon, 21 Nov 2016 20:26:47 -0800 Subject: [PATCH 14/17] Fix race condition --- autofile.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/autofile.go b/autofile.go index 60a314a4..e61bbb83 100644 --- a/autofile.go +++ b/autofile.go @@ -100,6 +100,9 @@ func (af *AutoFile) Write(b []byte) (n int, err error) { } func (af *AutoFile) Sync() error { + af.mtx.Lock() + defer af.mtx.Unlock() + return af.file.Sync() } @@ -115,6 +118,7 @@ func (af *AutoFile) openFile() error { func (af *AutoFile) Size() (int64, error) { af.mtx.Lock() defer af.mtx.Unlock() + if af.file == nil { err := af.openFile() if err != nil { From 0416e0aa9c68205aa44844096f9f151ada9d0405 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Tue, 6 Dec 2016 01:46:23 -0800 Subject: [PATCH 15/17] Close opened files --- group.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/group.go b/group.go index 4de0d034..ee1a9415 100644 --- a/group.go +++ b/group.go @@ -444,6 +444,7 @@ func (g *Group) readGroupInfo() GroupInfo { if err != nil { panic(err) } + defer dir.Close() fiz, err := dir.Readdir(0) if err != nil { panic(err) @@ -598,6 +599,9 @@ func (gr *GroupReader) openFile(index int) error { curReader := bufio.NewReader(curFile) // Update gr.cur* + if gr.curFile != nil { + gr.curFile.Close() // TODO return error? + } gr.curIndex = index gr.curFile = curFile gr.curReader = curReader From c3b80061662fe0d3108fc35e283b5a79444d9fa7 Mon Sep 17 00:00:00 2001 From: Alessio Treglia Date: Sat, 28 Jan 2017 12:42:01 +0000 Subject: [PATCH 16/17] Add LICENSE file --- LICENSE | 203 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 203 insertions(+) create mode 100644 LICENSE diff --git a/LICENSE b/LICENSE new file mode 100644 index 00000000..3a48142a --- /dev/null +++ b/LICENSE @@ -0,0 +1,203 @@ +All files are Copyright (C) 2017 Tendermint + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "{}" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright {yyyy} {name of copyright owner} + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. From 1c977f78fd17d2522123908016706cfcc27801e3 Mon Sep 17 00:00:00 2001 From: Jae Kwon Date: Sat, 28 Jan 2017 08:01:29 -0800 Subject: [PATCH 17/17] Update LICENSE --- LICENSE | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/LICENSE b/LICENSE index 3a48142a..9527e268 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -All files are Copyright (C) 2017 Tendermint +All files are Copyright (C) 2017 All in Bits, Inc Apache License Version 2.0, January 2004