db: fix c and go iterators

This commit is contained in:
Ethan Buchman
2017-12-13 22:28:37 -05:00
parent d4aeca8ce3
commit a2f7898b6d
3 changed files with 79 additions and 86 deletions

View File

@ -3,6 +3,7 @@
package db package db
import ( import (
"bytes"
"fmt" "fmt"
"path/filepath" "path/filepath"
@ -166,7 +167,7 @@ func (db *CLevelDB) Iterator(start, end []byte) Iterator {
} else { } else {
itr.SeekToFirst() itr.SeekToFirst()
} }
return cLevelDBIterator{ return &cLevelDBIterator{
itr: itr, itr: itr,
start: start, start: start,
end: end, end: end,
@ -183,43 +184,58 @@ var _ Iterator = (*cLevelDBIterator)(nil)
type cLevelDBIterator struct { type cLevelDBIterator struct {
itr *levigo.Iterator itr *levigo.Iterator
start, end []byte start, end []byte
invalid bool
} }
func (c cLevelDBIterator) Domain() ([]byte, []byte) { func (c *cLevelDBIterator) Domain() ([]byte, []byte) {
return c.start, c.end return c.start, c.end
} }
func (c cLevelDBIterator) Valid() bool { func (c *cLevelDBIterator) Valid() bool {
c.assertNoError() c.assertNoError()
return c.itr.Valid() if c.invalid {
return false
}
c.invalid = !c.itr.Valid()
return !c.invalid
} }
func (c cLevelDBIterator) Key() []byte { func (c *cLevelDBIterator) Key() []byte {
if !c.itr.Valid() { if !c.Valid() {
panic("cLevelDBIterator Key() called when invalid") panic("cLevelDBIterator Key() called when invalid")
} }
return c.itr.Key() return c.itr.Key()
} }
func (c cLevelDBIterator) Value() []byte { func (c *cLevelDBIterator) Value() []byte {
if !c.itr.Valid() { if !c.Valid() {
panic("cLevelDBIterator Value() called when invalid") panic("cLevelDBIterator Value() called when invalid")
} }
return c.itr.Value() return c.itr.Value()
} }
func (c cLevelDBIterator) Next() { func (c *cLevelDBIterator) Next() {
if !c.itr.Valid() { if !c.Valid() {
panic("cLevelDBIterator Next() called when invalid") panic("cLevelDBIterator Next() called when invalid")
} }
c.itr.Next() c.itr.Next()
c.checkEndKey() // if we've exceeded the range, we're now invalid
} }
func (c cLevelDBIterator) Release() { // levigo has no upper bound when iterating, so need to check ourselves
func (c *cLevelDBIterator) checkEndKey() []byte {
key := c.itr.Key()
if c.end != nil && bytes.Compare(key, c.end) > 0 {
c.invalid = true
}
return key
}
func (c *cLevelDBIterator) Release() {
c.itr.Close() c.itr.Close()
} }
func (c cLevelDBIterator) assertNoError() { func (c *cLevelDBIterator) assertNoError() {
if err := c.itr.GetError(); err != nil { if err := c.itr.GetError(); err != nil {
panic(err) panic(err)
} }

View File

@ -8,6 +8,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
. "github.com/tendermint/tmlibs/common" . "github.com/tendermint/tmlibs/common"
) )
@ -169,13 +170,24 @@ func (mBatch *goLevelDBBatch) Write() {
//---------------------------------------- //----------------------------------------
// Iterator // Iterator
func (db *GoLevelDB) Iterator(start, end []byte) Iterator { // https://godoc.org/github.com/syndtr/goleveldb/leveldb#DB.NewIterator
itr := db.db.NewIterator(nil, nil) // A nil Range.Start is treated as a key before all keys in the DB.
if len(start) > 0 { // And a nil Range.Limit is treated as a key after all keys in the DB.
itr.Seek(start) func goLevelDBIterRange(start, end []byte) *util.Range {
} else { // XXX: what if start == nil ?
itr.First() if len(start) == 0 {
start = nil
} }
return &util.Range{
Start: start,
Limit: end,
}
}
func (db *GoLevelDB) Iterator(start, end []byte) Iterator {
itrRange := goLevelDBIterRange(start, end)
itr := db.db.NewIterator(itrRange, nil)
itr.Seek(start) // if we don't call this the itr is never valid (?!)
return &goLevelDBIterator{ return &goLevelDBIterator{
source: itr, source: itr,
start: start, start: start,

View File

@ -5,6 +5,7 @@ import (
"testing" "testing"
) )
// empty iterator for empty db
func TestPrefixIteratorNoMatchNil(t *testing.T) { func TestPrefixIteratorNoMatchNil(t *testing.T) {
for backend, _ := range backends { for backend, _ := range backends {
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) { t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
@ -16,6 +17,7 @@ func TestPrefixIteratorNoMatchNil(t *testing.T) {
} }
} }
// empty iterator for db populated after iterator created
func TestPrefixIteratorNoMatch1(t *testing.T) { func TestPrefixIteratorNoMatch1(t *testing.T) {
for backend, _ := range backends { for backend, _ := range backends {
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) { t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
@ -28,24 +30,8 @@ func TestPrefixIteratorNoMatch1(t *testing.T) {
} }
} }
func TestPrefixIteratorMatch2(t *testing.T) { // empty iterator for prefix starting above db entry
for backend, _ := range backends { func TestPrefixIteratorNoMatch2(t *testing.T) {
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
db := newTempDB(t, backend)
db.SetSync(bz("2"), bz("value_2"))
itr := IteratePrefix(db, []byte("2"))
checkValid(t, itr, true)
checkItem(t, itr, bz("2"), bz("value_2"))
checkNext(t, itr, false)
// Once invalid...
checkInvalid(t, itr)
})
}
}
func TestPrefixIteratorMatch3(t *testing.T) {
for backend, _ := range backends { for backend, _ := range backends {
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) { t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
db := newTempDB(t, backend) db := newTempDB(t, backend)
@ -58,14 +44,40 @@ func TestPrefixIteratorMatch3(t *testing.T) {
} }
} }
// Search for a/1, fail by too much Next() // iterator with single val for db with single val, starting from that val
func TestPrefixIteratorMatch1(t *testing.T) {
for backend, _ := range backends {
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
db := newTempDB(t, backend)
db.SetSync(bz("2"), bz("value_2"))
itr := IteratePrefix(db, bz("2"))
checkValid(t, itr, true)
checkItem(t, itr, bz("2"), bz("value_2"))
checkNext(t, itr, false)
// Once invalid...
checkInvalid(t, itr)
})
}
}
// iterator with prefix iterates over everything with same prefix
func TestPrefixIteratorMatches1N(t *testing.T) { func TestPrefixIteratorMatches1N(t *testing.T) {
for backend, _ := range backends { for backend, _ := range backends {
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) { t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
db := newTempDB(t, backend) db := newTempDB(t, backend)
// prefixed
db.SetSync(bz("a/1"), bz("value_1")) db.SetSync(bz("a/1"), bz("value_1"))
db.SetSync(bz("a/3"), bz("value_3")) db.SetSync(bz("a/3"), bz("value_3"))
itr := IteratePrefix(db, []byte("a/"))
// not
db.SetSync(bz("b/3"), bz("value_3"))
db.SetSync(bz("a-3"), bz("value_3"))
db.SetSync(bz("a.3"), bz("value_3"))
db.SetSync(bz("abcdefg"), bz("value_3"))
itr := IteratePrefix(db, bz("a/"))
checkValid(t, itr, true) checkValid(t, itr, true)
checkItem(t, itr, bz("a/1"), bz("value_1")) checkItem(t, itr, bz("a/1"), bz("value_1"))
@ -75,54 +87,7 @@ func TestPrefixIteratorMatches1N(t *testing.T) {
// Bad! // Bad!
checkNext(t, itr, false) checkNext(t, itr, false)
// Once invalid... //Once invalid...
checkInvalid(t, itr)
})
}
}
// Search for a/2, fail by too much Next()
func TestPrefixIteratorMatches2N(t *testing.T) {
for backend, _ := range backends {
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
db := newTempDB(t, backend)
db.SetSync(bz("a/1"), bz("value_1"))
db.SetSync(bz("a/3"), bz("value_3"))
itr := IteratePrefix(db, []byte("a/"))
checkValid(t, itr, true)
checkItem(t, itr, bz("a/1"), bz("value_1"))
checkNext(t, itr, true)
checkValid(t, itr, true)
checkItem(t, itr, bz("a/3"), bz("value_3"))
// Bad!
checkNext(t, itr, false)
// Once invalid...
checkInvalid(t, itr)
})
}
}
// Search for a/3, fail by too much Next()
func TestPrefixIteratorMatches3N(t *testing.T) {
for backend, _ := range backends {
t.Run(fmt.Sprintf("Prefix w/ backend %s", backend), func(t *testing.T) {
db := newTempDB(t, backend)
db.SetSync(bz("a/1"), bz("value_1"))
db.SetSync(bz("a/3"), bz("value_3"))
itr := IteratePrefix(db, []byte("a/"))
checkValid(t, itr, true)
checkItem(t, itr, bz("a/1"), bz("value_1"))
checkNext(t, itr, true)
checkItem(t, itr, bz("a/3"), bz("value_3"))
// Bad!
checkNext(t, itr, false)
// Once invalid...
checkInvalid(t, itr) checkInvalid(t, itr)
}) })
} }