mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
use an iterator for provider key sweeping
This commit is contained in:
parent
044ea16a66
commit
b17e206c81
@ -206,44 +206,40 @@ func (pm *ProviderManager) deleteProvSet(k *cid.Cid) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) getAllProvKeys() ([]*cid.Cid, error) {
|
||||
func (pm *ProviderManager) getProvKeys() (func() (*cid.Cid, bool), error) {
|
||||
res, err := pm.dstore.Query(dsq.Query{
|
||||
KeysOnly: true,
|
||||
KeysOnly: false,
|
||||
Prefix: providersKeyPrefix,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
entries, err := res.Rest()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
iter := func() (*cid.Cid, bool) {
|
||||
for e := range res.Next() {
|
||||
parts := strings.Split(e.Key, "/")
|
||||
if len(parts) != 4 {
|
||||
log.Warning("incorrectly formatted provider entry in datastore")
|
||||
continue
|
||||
}
|
||||
decoded, err := base32.RawStdEncoding.DecodeString(parts[2])
|
||||
if err != nil {
|
||||
log.Warning("error decoding base32 provider key")
|
||||
continue
|
||||
}
|
||||
|
||||
c, err := cid.Cast(decoded)
|
||||
if err != nil {
|
||||
log.Warning("error casting key to cid from datastore key")
|
||||
continue
|
||||
}
|
||||
|
||||
return c, true
|
||||
}
|
||||
return nil, false
|
||||
}
|
||||
|
||||
seen := cid.NewSet()
|
||||
for _, e := range entries {
|
||||
parts := strings.Split(e.Key, "/")
|
||||
if len(parts) != 4 {
|
||||
log.Warning("incorrectly formatted provider entry in datastore")
|
||||
continue
|
||||
}
|
||||
decoded, err := base32.RawStdEncoding.DecodeString(parts[2])
|
||||
if err != nil {
|
||||
log.Warning("error decoding base32 provider key")
|
||||
continue
|
||||
}
|
||||
|
||||
c, err := cid.Cast(decoded)
|
||||
if err != nil {
|
||||
log.Warning("error casting key to cid from datastore key")
|
||||
continue
|
||||
}
|
||||
|
||||
seen.Add(c)
|
||||
}
|
||||
|
||||
return seen.Keys(), nil
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) run() {
|
||||
@ -263,12 +259,17 @@ func (pm *ProviderManager) run() {
|
||||
|
||||
gp.resp <- provs
|
||||
case <-tick.C:
|
||||
keys, err := pm.getAllProvKeys()
|
||||
keys, err := pm.getProvKeys()
|
||||
if err != nil {
|
||||
log.Error("Error loading provider keys: ", err)
|
||||
continue
|
||||
}
|
||||
for _, k := range keys {
|
||||
for {
|
||||
k, ok := keys()
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
|
||||
provs, err := pm.getProvSet(k)
|
||||
if err != nil {
|
||||
log.Error("error loading known provset: ", err)
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
|
||||
cid "github.com/ipfs/go-cid"
|
||||
ds "github.com/ipfs/go-datastore"
|
||||
lds "github.com/ipfs/go-ds-leveldb"
|
||||
//lds "github.com/ipfs/go-ds-leveldb"
|
||||
u "github.com/ipfs/go-ipfs-util"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
)
|
||||
@ -142,17 +142,21 @@ func TestProvidesExpire(t *testing.T) {
|
||||
t.Fatal("providers map not cleaned up")
|
||||
}
|
||||
|
||||
allprovs, err := p.getAllProvKeys()
|
||||
proviter, err := p.getProvKeys()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(allprovs) != 0 {
|
||||
_, ok := proviter()
|
||||
if ok {
|
||||
t.Fatal("expected everything to be cleaned out of the datastore")
|
||||
}
|
||||
}
|
||||
|
||||
///* This can be used for profiling. Keeping it commented out for now to avoid incurring extra CI time
|
||||
var _ = ioutil.NopCloser
|
||||
var _ = os.DevNull
|
||||
|
||||
/* This can be used for profiling. Keeping it commented out for now to avoid incurring extra CI time
|
||||
func TestLargeProvidersSet(t *testing.T) {
|
||||
old := lruCacheSize
|
||||
lruCacheSize = 10
|
||||
@ -201,3 +205,4 @@ func TestLargeProvidersSet(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
//*/
|
||||
|
Loading…
x
Reference in New Issue
Block a user