diff --git a/db/c_level_db.go b/db/c_level_db.go index e4450aaa..e867b000 100644 --- a/db/c_level_db.go +++ b/db/c_level_db.go @@ -22,8 +22,6 @@ type CLevelDB struct { ro *levigo.ReadOptions wo *levigo.WriteOptions woSync *levigo.WriteOptions - - cwwMutex } func NewCLevelDB(name string, dir string) (*CLevelDB, error) { @@ -45,8 +43,6 @@ func NewCLevelDB(name string, dir string) (*CLevelDB, error) { ro: ro, wo: wo, woSync: woSync, - - cwwMutex: NewCWWMutex(), } return database, nil } @@ -59,6 +55,10 @@ func (db *CLevelDB) Get(key []byte) []byte { return res } +func (db *CLevelDB) Has(key []byte) bool { + panic("not implemented yet") +} + func (db *CLevelDB) Set(key []byte, value []byte) { err := db.db.Put(db.wo, key, value) if err != nil { @@ -99,9 +99,9 @@ func (db *CLevelDB) Close() { } func (db *CLevelDB) Print() { - itr := db.Iterator() - defer itr.Close() - for itr.Seek(nil); itr.Valid(); itr.Next() { + itr := db.Iterator(BeginningKey(), EndingKey()) + defer itr.Release() + for ; itr.Valid(); itr.Next() { key := itr.Key() value := itr.Value() fmt.Printf("[%X]:\t[%X]\n", key, value) @@ -120,10 +120,6 @@ func (db *CLevelDB) Stats() map[string]string { return stats } -func (db *CLevelDB) CacheDB() CacheDB { - return NewCacheDB(db, db.GetWriteLockVersion()) -} - //---------------------------------------- // Batch @@ -155,10 +151,19 @@ func (mBatch *cLevelDBBatch) Write() { //---------------------------------------- // Iterator -func (db *CLevelDB) Iterator() Iterator { - itr := db.db.NewIterator(db.ro) - itr.Seek([]byte{0x00}) - return cLevelDBIterator{itr} +func (db *CLevelDB) Iterator(start, end []byte) Iterator { + /* + XXX + itr := db.db.NewIterator(db.ro) + itr.Seek([]byte{0x00}) + return cLevelDBIterator{itr} + */ + return nil +} + +func (db *CLevelDB) ReverseIterator(start, end []byte) Iterator { + // XXX + return nil } type cLevelDBIterator struct { @@ -204,7 +209,7 @@ func (c cLevelDBIterator) Prev() { c.itr.Prev() } -func (c cLevelDBIterator) Close() { +func (c cLevelDBIterator) Release() { c.itr.Close() } diff --git a/db/cache_db.go b/db/cache_db.go deleted file mode 100644 index 586f2f67..00000000 --- a/db/cache_db.go +++ /dev/null @@ -1,233 +0,0 @@ -/* -package db - -import ( - "fmt" - "sort" - "sync" - "sync/atomic" -) - -// If value is nil but deleted is false, -// it means the parent doesn't have the key. -// (No need to delete upon Write()) -type cDBValue struct { - value []byte - deleted bool - dirty bool -} - -// cacheDB wraps an in-memory cache around an underlying DB. -type cacheDB struct { - mtx sync.Mutex - cache map[string]cDBValue - parent DB - lockVersion interface{} - - cwwMutex -} - -// Needed by MultiStore.CacheWrap(). -var _ atomicSetDeleter = (*cacheDB)(nil) -var _ CacheDB = (*cacheDB)(nil) - -// Users should typically not be required to call NewCacheDB directly, as the -// DB implementations here provide a .CacheDB() function already. -// `lockVersion` is typically provided by parent.GetWriteLockVersion(). -func NewCacheDB(parent DB, lockVersion interface{}) CacheDB { - db := &cacheDB{ - cache: make(map[string]cDBValue), - parent: parent, - lockVersion: lockVersion, - cwwMutex: NewCWWMutex(), - } - return db -} - -func (db *cacheDB) Get(key []byte) []byte { - db.mtx.Lock() - defer db.mtx.Unlock() - - dbValue, ok := db.cache[string(key)] - if !ok { - data := db.parent.Get(key) - dbValue = cDBValue{value: data, deleted: false, dirty: false} - db.cache[string(key)] = dbValue - } - return dbValue.value -} - -func (db *cacheDB) Set(key []byte, value []byte) { - db.mtx.Lock() - defer db.mtx.Unlock() - - db.SetNoLock(key, value) -} - -func (db *cacheDB) SetSync(key []byte, value []byte) { - db.mtx.Lock() - defer db.mtx.Unlock() - - db.SetNoLock(key, value) -} - -func (db *cacheDB) SetNoLock(key []byte, value []byte) { - db.cache[string(key)] = cDBValue{value: value, deleted: false, dirty: true} -} - -func (db *cacheDB) Delete(key []byte) { - db.mtx.Lock() - defer db.mtx.Unlock() - - db.DeleteNoLock(key) -} - -func (db *cacheDB) DeleteSync(key []byte) { - db.mtx.Lock() - defer db.mtx.Unlock() - - db.DeleteNoLock(key) -} - -func (db *cacheDB) DeleteNoLock(key []byte) { - db.cache[string(key)] = cDBValue{value: nil, deleted: true, dirty: true} -} - -func (db *cacheDB) Close() { - db.mtx.Lock() - defer db.mtx.Unlock() - - db.parent.Close() -} - -func (db *cacheDB) Print() { - db.mtx.Lock() - defer db.mtx.Unlock() - - fmt.Println("cacheDB\ncache:") - for key, value := range db.cache { - fmt.Printf("[%X]:\t[%v]\n", []byte(key), value) - } - fmt.Println("\nparent:") - db.parent.Print() -} - -func (db *cacheDB) Stats() map[string]string { - db.mtx.Lock() - defer db.mtx.Unlock() - - stats := make(map[string]string) - stats["cache.size"] = fmt.Sprintf("%d", len(db.cache)) - stats["cache.lock_version"] = fmt.Sprintf("%v", db.lockVersion) - mergeStats(db.parent.Stats(), stats, "parent.") - return stats -} - -func (db *cacheDB) Iterator() Iterator { - panic("cacheDB.Iterator() not yet supported") -} - -func (db *cacheDB) NewBatch() Batch { - return &memBatch{db, nil} -} - -// Implements `atomicSetDeleter` for Batch support. -func (db *cacheDB) Mutex() *sync.Mutex { - return &(db.mtx) -} - -// Write writes pending updates to the parent database and clears the cache. -func (db *cacheDB) Write() { - db.mtx.Lock() - defer db.mtx.Unlock() - - // Optional sanity check to ensure that cacheDB is valid - if parent, ok := db.parent.(WriteLocker); ok { - if parent.TryWriteLock(db.lockVersion) { - // All good! - } else { - panic("cacheDB.Write() failed. Did this CacheDB expire?") - } - } - - // We need a copy of all of the keys. - // Not the best, but probably not a bottleneck depending. - keys := make([]string, 0, len(db.cache)) - for key, dbValue := range db.cache { - if dbValue.dirty { - keys = append(keys, key) - } - } - sort.Strings(keys) - - batch := db.parent.NewBatch() - for _, key := range keys { - dbValue := db.cache[key] - if dbValue.deleted { - batch.Delete([]byte(key)) - } else if dbValue.value == nil { - // Skip, it already doesn't exist in parent. - } else { - batch.Set([]byte(key), dbValue.value) - } - } - batch.Write() - - // Clear the cache - db.cache = make(map[string]cDBValue) -} - -//---------------------------------------- -// To cache-wrap this cacheDB further. - -func (db *cacheDB) CacheDB() CacheDB { - return NewCacheDB(db, db.GetWriteLockVersion()) -} - -// If the parent parent DB implements this, (e.g. such as a cacheDB parent to a -// cacheDB child), cacheDB will call `parent.TryWriteLock()` before attempting -// to write. -type WriteLocker interface { - GetWriteLockVersion() (lockVersion interface{}) - TryWriteLock(lockVersion interface{}) bool -} - -// Implements TryWriteLocker. Embed this in DB structs if desired. -type cwwMutex struct { - mtx sync.Mutex - // CONTRACT: reading/writing to `*written` should use `atomic.*`. - // CONTRACT: replacing `written` with another *int32 should use `.mtx`. - written *int32 -} - -func NewCWWMutex() cwwMutex { - return cwwMutex{ - written: new(int32), - } -} - -func (cww *cwwMutex) GetWriteLockVersion() interface{} { - cww.mtx.Lock() - defer cww.mtx.Unlock() - - // `written` works as a "version" object because it gets replaced upon - // successful TryWriteLock. - return cww.written -} - -func (cww *cwwMutex) TryWriteLock(version interface{}) bool { - cww.mtx.Lock() - defer cww.mtx.Unlock() - - if version != cww.written { - return false // wrong "WriteLockVersion" - } - if !atomic.CompareAndSwapInt32(cww.written, 0, 1) { - return false // already written - } - - // New "WriteLockVersion" - cww.written = new(int32) - return true -} -*/ diff --git a/db/db.go b/db/db.go index ba137743..7eec04d5 100644 --- a/db/db.go +++ b/db/db.go @@ -1,5 +1,7 @@ package db +import "fmt" + //----------------------------------------------------------------------------- // Main entry @@ -26,7 +28,7 @@ func registerDBCreator(backend string, creator dbCreator, force bool) { func NewDB(name string, backend string, dir string) DB { db, err := backends[backend](name, dir) if err != nil { - PanicSanity(Fmt("Error initializing DB: %v", err)) + panic(fmt.Sprintf("Error initializing DB: %v", err)) } return db } diff --git a/db/fsdb.go b/db/fsdb.go index 4b191445..b6e08daf 100644 --- a/db/fsdb.go +++ b/db/fsdb.go @@ -7,7 +7,6 @@ import ( "os" "path" "path/filepath" - "sort" "sync" "github.com/pkg/errors" @@ -29,8 +28,6 @@ func init() { type FSDB struct { mtx sync.Mutex dir string - - cwwMutex } func NewFSDB(dir string) *FSDB { @@ -39,8 +36,7 @@ func NewFSDB(dir string) *FSDB { panic(errors.Wrap(err, "Creating FSDB dir "+dir)) } database := &FSDB{ - dir: dir, - cwwMutex: NewCWWMutex(), + dir: dir, } return database } @@ -59,6 +55,20 @@ func (db *FSDB) Get(key []byte) []byte { return value } +func (db *FSDB) Has(key []byte) bool { + db.mtx.Lock() + defer db.mtx.Unlock() + + path := db.nameToPath(key) + _, err := read(path) + if os.IsNotExist(err) { + return false + } else if err != nil { + panic(errors.Wrap(err, fmt.Sprintf("Getting key %s (0x%X)", string(key), key))) + } + return true +} + func (db *FSDB) Set(key []byte, value []byte) { db.mtx.Lock() defer db.mtx.Unlock() @@ -140,27 +150,32 @@ func (db *FSDB) Mutex() *sync.Mutex { return &(db.mtx) } -func (db *FSDB) CacheDB() CacheDB { - return NewCacheDB(db, db.GetWriteLockVersion()) +func (db *FSDB) Iterator(start, end []byte) Iterator { + /* + XXX + it := newMemDBIterator() + it.db = db + it.cur = 0 + + db.mtx.Lock() + defer db.mtx.Unlock() + + // We need a copy of all of the keys. + // Not the best, but probably not a bottleneck depending. + keys, err := list(db.dir) + if err != nil { + panic(errors.Wrap(err, fmt.Sprintf("Listing keys in %s", db.dir))) + } + sort.Strings(keys) + it.keys = keys + return it + */ + return nil } -func (db *FSDB) Iterator() Iterator { - it := newMemDBIterator() - it.db = db - it.cur = 0 - - db.mtx.Lock() - defer db.mtx.Unlock() - - // We need a copy of all of the keys. - // Not the best, but probably not a bottleneck depending. - keys, err := list(db.dir) - if err != nil { - panic(errors.Wrap(err, fmt.Sprintf("Listing keys in %s", db.dir))) - } - sort.Strings(keys) - it.keys = keys - return it +func (db *FSDB) ReverseIterator(start, end []byte) Iterator { + // XXX + return nil } func (db *FSDB) nameToPath(name []byte) string { diff --git a/db/go_level_db.go b/db/go_level_db.go index cffe7329..e8ed99de 100644 --- a/db/go_level_db.go +++ b/db/go_level_db.go @@ -22,8 +22,6 @@ func init() { type GoLevelDB struct { db *leveldb.DB - - cwwMutex } func NewGoLevelDB(name string, dir string) (*GoLevelDB, error) { @@ -33,8 +31,7 @@ func NewGoLevelDB(name string, dir string) (*GoLevelDB, error) { return nil, err } database := &GoLevelDB{ - db: db, - cwwMutex: NewCWWMutex(), + db: db, } return database, nil } @@ -51,6 +48,18 @@ func (db *GoLevelDB) Get(key []byte) []byte { return res } +func (db *GoLevelDB) Has(key []byte) bool { + _, err := db.db.Get(key, nil) + if err != nil { + if err == errors.ErrNotFound { + return false + } else { + PanicCrisis(err) + } + } + return true +} + func (db *GoLevelDB) Set(key []byte, value []byte) { err := db.db.Put(key, value, nil) if err != nil { @@ -121,10 +130,6 @@ func (db *GoLevelDB) Stats() map[string]string { return stats } -func (db *GoLevelDB) CacheDB() CacheDB { - return NewCacheDB(db, db.GetWriteLockVersion()) -} - //---------------------------------------- // Batch @@ -156,12 +161,21 @@ func (mBatch *goLevelDBBatch) Write() { //---------------------------------------- // Iterator -func (db *GoLevelDB) Iterator() Iterator { - itr := &goLevelDBIterator{ - source: db.db.NewIterator(nil, nil), - } - itr.Seek(nil) - return itr +func (db *GoLevelDB) Iterator(start, end []byte) Iterator { + /* + XXX + itr := &goLevelDBIterator{ + source: db.db.NewIterator(nil, nil), + } + itr.Seek(nil) + return itr + */ + return nil +} + +func (db *GoLevelDB) ReverseIterator(start, end []byte) Iterator { + // XXX + return nil } type goLevelDBIterator struct { diff --git a/db/mem_db.go b/db/mem_db.go index f5d55f3a..3127030a 100644 --- a/db/mem_db.go +++ b/db/mem_db.go @@ -3,7 +3,6 @@ package db import ( "bytes" "fmt" - "sort" "sync" ) @@ -16,14 +15,11 @@ func init() { type MemDB struct { mtx sync.Mutex db map[string][]byte - - cwwMutex } func NewMemDB() *MemDB { database := &MemDB{ - db: make(map[string][]byte), - cwwMutex: NewCWWMutex(), + db: make(map[string][]byte), } return database } @@ -35,6 +31,14 @@ func (db *MemDB) Get(key []byte) []byte { return db.db[string(key)] } +func (db *MemDB) Has(key []byte) bool { + db.mtx.Lock() + defer db.mtx.Unlock() + + _, ok := db.db[string(key)] + return ok +} + func (db *MemDB) Set(key []byte, value []byte) { db.mtx.Lock() defer db.mtx.Unlock() @@ -114,27 +118,32 @@ func (db *MemDB) Mutex() *sync.Mutex { return &(db.mtx) } -func (db *MemDB) CacheDB() CacheDB { - return NewCacheDB(db, db.GetWriteLockVersion()) -} - //---------------------------------------- -func (db *MemDB) Iterator() Iterator { - it := newMemDBIterator() - it.db = db - it.cur = 0 +func (db *MemDB) Iterator(start, end []byte) Iterator { + /* + XXX + it := newMemDBIterator() + it.db = db + it.cur = 0 - db.mtx.Lock() - defer db.mtx.Unlock() + db.mtx.Lock() + defer db.mtx.Unlock() - // We need a copy of all of the keys. - // Not the best, but probably not a bottleneck depending. - for key, _ := range db.db { - it.keys = append(it.keys, key) - } - sort.Strings(it.keys) - return it + // We need a copy of all of the keys. + // Not the best, but probably not a bottleneck depending. + for key, _ := range db.db { + it.keys = append(it.keys, key) + } + sort.Strings(it.keys) + return it + */ + return nil +} + +func (db *MemDB) ReverseIterator(start, end []byte) Iterator { + // XXX + return nil } type memDBIterator struct { diff --git a/db/types.go b/db/types.go index f343e1d7..7422a515 100644 --- a/db/types.go +++ b/db/types.go @@ -54,12 +54,23 @@ type SetDeleter interface { //---------------------------------------- +func BeginningKey() []byte { + return []byte{} +} + +func EndingKey() []byte { + return nil +} + /* Usage: - for itr.Seek(mykey); itr.Valid(); itr.Next() { + var itr Iterator = ... + defer itr.Release() + + for ; itr.Valid(); itr.Next() { k, v := itr.Key(); itr.Value() - .... + // ... } */ type Iterator interface { @@ -106,6 +117,6 @@ type Iterator interface { // This method is safe to call when Valid returns false. GetError() error - // Close deallocates the given Iterator. - Close() + // Release deallocates the given Iterator. + Release() } diff --git a/db/util.go b/db/util.go index 5f381a5b..89c77762 100644 --- a/db/util.go +++ b/db/util.go @@ -1,82 +1,35 @@ package db -import "bytes" - -// A wrapper around itr that tries to keep the iterator -// within the bounds as defined by `prefix` -type prefixIterator struct { - itr Iterator - prefix []byte - invalid bool -} - -func (pi *prefixIterator) Seek(key []byte) { - if !bytes.HasPrefix(key, pi.prefix) { - pi.invalid = true - return - } - pi.itr.Seek(key) - pi.checkInvalid() -} - -func (pi *prefixIterator) checkInvalid() { - if !pi.itr.Valid() { - pi.invalid = true - } -} - -func (pi *prefixIterator) Valid() bool { - if pi.invalid { - return false - } - key := pi.itr.Key() - ok := bytes.HasPrefix(key, pi.prefix) - if !ok { - pi.invalid = true - return false - } - return true -} - -func (pi *prefixIterator) Next() { - if pi.invalid { - panic("prefixIterator Next() called when invalid") - } - pi.itr.Next() - pi.checkInvalid() -} - -func (pi *prefixIterator) Prev() { - if pi.invalid { - panic("prefixIterator Prev() called when invalid") - } - pi.itr.Prev() - pi.checkInvalid() -} - -func (pi *prefixIterator) Key() []byte { - if pi.invalid { - panic("prefixIterator Key() called when invalid") - } - return pi.itr.Key() -} - -func (pi *prefixIterator) Value() []byte { - if pi.invalid { - panic("prefixIterator Value() called when invalid") - } - return pi.itr.Value() -} - -func (pi *prefixIterator) Close() { pi.itr.Close() } -func (pi *prefixIterator) GetError() error { return pi.itr.GetError() } - func IteratePrefix(db DB, prefix []byte) Iterator { - itr := db.Iterator() - pi := &prefixIterator{ - itr: itr, - prefix: prefix, + var start, end []byte + if len(prefix) == 0 { + start = BeginningKey() + end = EndingKey() + } else { + start = cp(prefix) + end = cpIncr(prefix) } - pi.Seek(prefix) - return pi + return db.Iterator(start, end) +} + +//---------------------------------------- + +func cp(bz []byte) (ret []byte) { + ret = make([]byte, len(bz)) + copy(ret, bz) + return ret +} + +// CONTRACT: len(bz) > 0 +func cpIncr(bz []byte) (ret []byte) { + ret = cp(bz) + for i := len(bz) - 1; i >= 0; i-- { + if ret[i] < byte(0xFF) { + ret[i] += 1 + return + } else { + ret[i] = byte(0x00) + } + } + return EndingKey() }