diff --git a/autofile/group.go b/autofile/group.go index eedb67b5..bbf77d27 100644 --- a/autofile/group.go +++ b/autofile/group.go @@ -18,6 +18,13 @@ import ( . "github.com/tendermint/tmlibs/common" ) +const ( + groupCheckDuration = 5000 * time.Millisecond + defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB + defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB + maxFilesToRemove = 4 // needs to be greater than 1 +) + /* You can open a Group to keep restrictions on an AutoFile, like the maximum size of each chunk, and/or the total amount of bytes @@ -25,33 +32,27 @@ stored in the group. The first file to be written in the Group.Dir is the head file. - Dir/ - - + Dir/ + - Once the Head file reaches the size limit, it will be rotated. - Dir/ - - .000 // First rolled file - - // New head path, starts empty. - // The implicit index is 001. + Dir/ + - .000 // First rolled file + - // New head path, starts empty. + // The implicit index is 001. As more files are written, the index numbers grow... - Dir/ - - .000 // First rolled file - - .001 // Second rolled file - - ... - - // New head path + Dir/ + - .000 // First rolled file + - .001 // Second rolled file + - ... + - // New head path The Group can also be used to binary-search for some line, assuming that marker lines are written occasionally. */ - -const groupCheckDuration = 5000 * time.Millisecond -const defaultHeadSizeLimit = 10 * 1024 * 1024 // 10MB -const defaultTotalSizeLimit = 1 * 1024 * 1024 * 1024 // 1GB -const maxFilesToRemove = 4 // needs to be greater than 1 - type Group struct { BaseService @@ -109,37 +110,61 @@ func (g *Group) OnStop() { g.ticker.Stop() } +// SetHeadSizeLimit allows you to overwrite default head size limit - 10MB. func (g *Group) SetHeadSizeLimit(limit int64) { g.mtx.Lock() g.headSizeLimit = limit g.mtx.Unlock() } +// HeadSizeLimit returns the current head size limit. func (g *Group) HeadSizeLimit() int64 { g.mtx.Lock() defer g.mtx.Unlock() return g.headSizeLimit } +// SetTotalSizeLimit allows you to overwrite default total size limit of the +// group - 1GB. func (g *Group) SetTotalSizeLimit(limit int64) { g.mtx.Lock() g.totalSizeLimit = limit g.mtx.Unlock() } +// TotalSizeLimit returns total size limit of the group. func (g *Group) TotalSizeLimit() int64 { g.mtx.Lock() defer g.mtx.Unlock() return g.totalSizeLimit } +// MaxIndex returns index of the last file in the group. func (g *Group) MaxIndex() int { g.mtx.Lock() defer g.mtx.Unlock() return g.maxIndex } -// Auto appends "\n" +// MinIndex returns index of the first file in the group. +func (g *Group) MinIndex() int { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.minIndex +} + +// Write writes the contents of p into the current head of the group. It +// returns the number of bytes written. If nn < len(p), it also returns an +// error explaining why the write is short. +// NOTE: Writes are buffered so they don't write synchronously +// TODO: Make it halt if space is unavailable +func (g *Group) Write(p []byte) (nn int, err error) { + g.mtx.Lock() + defer g.mtx.Unlock() + return g.headBuf.Write(p) +} + +// WriteLine writes line into the current head of the group. It also appends "\n". // NOTE: Writes are buffered so they don't write synchronously // TODO: Make it halt if space is unavailable func (g *Group) WriteLine(line string) error { @@ -149,6 +174,8 @@ func (g *Group) WriteLine(line string) error { return err } +// Flush writes any buffered data to the underlying file and commits the +// current content of the file to stable storage. func (g *Group) Flush() error { g.mtx.Lock() defer g.mtx.Unlock() @@ -223,6 +250,8 @@ func (g *Group) checkTotalSizeLimit() { } } +// RotateFile causes group to close the current head and assign it some index. +// Note it does not create a new head. func (g *Group) RotateFile() { g.mtx.Lock() defer g.mtx.Unlock() @@ -241,8 +270,8 @@ func (g *Group) RotateFile() { g.maxIndex += 1 } -// NOTE: if error, returns no GroupReader. -// CONTRACT: Caller must close the returned GroupReader +// NewReader returns a new group reader. +// CONTRACT: Caller must close the returned GroupReader. func (g *Group) NewReader(index int) (*GroupReader, error) { r := newGroupReader(g) err := r.SetIndex(index) @@ -423,14 +452,15 @@ GROUP_LOOP: return } +// GroupInfo holds information about the group. type GroupInfo struct { - MinIndex int - MaxIndex int - TotalSize int64 - HeadSize int64 + MinIndex int // index of the first file in the group, including head + MaxIndex int // index of the last file in the group, including head + TotalSize int64 // total size of the group + HeadSize int64 // size of the head } -// Returns info after scanning all files in g.Head's dir +// Returns info after scanning all files in g.Head's dir. func (g *Group) ReadGroupInfo() GroupInfo { g.mtx.Lock() defer g.mtx.Unlock() @@ -505,6 +535,7 @@ func filePathForIndex(headPath string, index int, maxIndex int) string { //-------------------------------------------------------------------------------- +// GroupReader provides an interface for reading from a Group. type GroupReader struct { *Group mtx sync.Mutex @@ -524,6 +555,7 @@ func newGroupReader(g *Group) *GroupReader { } } +// Close closes the GroupReader by closing the cursor file. func (gr *GroupReader) Close() error { gr.mtx.Lock() defer gr.mtx.Unlock() @@ -540,7 +572,48 @@ func (gr *GroupReader) Close() error { } } -// Reads a line (without delimiter) +// Read implements io.Reader, reading bytes from the current Reader +// incrementing index until enough bytes are read. +func (gr *GroupReader) Read(p []byte) (n int, err error) { + lenP := len(p) + if lenP == 0 { + return 0, errors.New("given empty slice") + } + + gr.mtx.Lock() + defer gr.mtx.Unlock() + + // Open file if not open yet + if gr.curReader == nil { + if err = gr.openFile(gr.curIndex); err != nil { + return 0, err + } + } + + // Iterate over files until enough bytes are read + var nn int + for { + nn, err = gr.curReader.Read(p[n:]) + n += nn + if err == io.EOF { + // Open the next file + if err1 := gr.openFile(gr.curIndex + 1); err1 != nil { + return n, err1 + } + if n >= lenP { + return n, nil + } else { + continue + } + } else if err != nil { + return n, err + } else if nn == 0 { // empty file + return n, err + } + } +} + +// ReadLine reads a line (without delimiter). // just return io.EOF if no new lines found. func (gr *GroupReader) ReadLine() (string, error) { gr.mtx.Lock() @@ -613,6 +686,9 @@ func (gr *GroupReader) openFile(index int) error { return nil } +// PushLine makes the given line the current one, so the next time somebody +// calls ReadLine, this line will be returned. +// panics if called twice without calling ReadLine. func (gr *GroupReader) PushLine(line string) { gr.mtx.Lock() defer gr.mtx.Unlock() @@ -624,13 +700,15 @@ func (gr *GroupReader) PushLine(line string) { } } -// Cursor's file index. +// CurIndex returns cursor's file index. func (gr *GroupReader) CurIndex() int { gr.mtx.Lock() defer gr.mtx.Unlock() return gr.curIndex } +// SetIndex sets the cursor's file index to index by opening a file at this +// position. func (gr *GroupReader) SetIndex(index int) error { gr.mtx.Lock() defer gr.mtx.Unlock() diff --git a/autofile/group_test.go b/autofile/group_test.go index 0cfcef72..398ea3ae 100644 --- a/autofile/group_test.go +++ b/autofile/group_test.go @@ -1,6 +1,7 @@ package autofile import ( + "bytes" "errors" "io" "io/ioutil" @@ -400,3 +401,93 @@ func TestFindLast4(t *testing.T) { // Cleanup destroyTestGroup(t, g) } + +func TestWrite(t *testing.T) { + g := createTestGroup(t, 0) + + written := []byte("Medusa") + g.Write(written) + g.Flush() + + read := make([]byte, len(written)) + gr, err := g.NewReader(0) + if err != nil { + t.Fatalf("Failed to create reader: %v", err) + } + _, err = gr.Read(read) + if err != nil { + t.Fatalf("Failed to read data: %v", err) + } + + if !bytes.Equal(written, read) { + t.Errorf("%s, %s should be equal", string(written), string(read)) + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestGroupReaderRead(t *testing.T) { + g := createTestGroup(t, 0) + + professor := []byte("Professor Monster") + g.Write(professor) + g.Flush() + g.RotateFile() + frankenstein := []byte("Frankenstein's Monster") + g.Write(frankenstein) + g.Flush() + + totalWrittenLength := len(professor) + len(frankenstein) + read := make([]byte, totalWrittenLength) + gr, err := g.NewReader(0) + if err != nil { + t.Fatalf("Failed to create reader: %v", err) + } + n, err := gr.Read(read) + if err != nil { + t.Fatalf("Failed to read data: %v", err) + } + if n != totalWrittenLength { + t.Errorf("Failed to read enough bytes: wanted %d, but read %d", totalWrittenLength, n) + } + + professorPlusFrankenstein := professor + professorPlusFrankenstein = append(professorPlusFrankenstein, frankenstein...) + if !bytes.Equal(read, professorPlusFrankenstein) { + t.Errorf("%s, %s should be equal", string(professorPlusFrankenstein), string(read)) + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestMinIndex(t *testing.T) { + g := createTestGroup(t, 0) + + if g.MinIndex() != 0 { + t.Error("MinIndex should be zero at the beginning") + } + + // Cleanup + destroyTestGroup(t, g) +} + +func TestMaxIndex(t *testing.T) { + g := createTestGroup(t, 0) + + if g.MaxIndex() != 0 { + t.Error("MaxIndex should be zero at the beginning") + } + + g.WriteLine("Line 1") + g.Flush() + g.RotateFile() + + if g.MaxIndex() != 1 { + t.Error("MaxIndex should point to the last file") + } + + // Cleanup + destroyTestGroup(t, g) +}