providers: optimize GC

1. Don't be n^2.
2. Don't bother walking the cache, just drop it.
This commit is contained in:
Steven Allen 2019-04-12 22:03:39 -07:00
parent a3b9767038
commit fbb29ea6b6
2 changed files with 54 additions and 101 deletions

View File

@ -68,7 +68,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
pm.proc = goprocessctx.WithContext(ctx) pm.proc = goprocessctx.WithContext(ctx)
pm.cleanupInterval = defaultCleanupInterval pm.cleanupInterval = defaultCleanupInterval
pm.proc.Go(func(p goprocess.Process) { pm.run() }) pm.proc.Go(pm.run)
return pm return pm
} }
@ -191,69 +191,49 @@ func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time)
return dstore.Put(ds.NewKey(dsk), buf[:n]) return dstore.Put(ds.NewKey(dsk), buf[:n])
} }
func (pm *ProviderManager) deleteProvSet(k cid.Cid) error { func (pm *ProviderManager) gc() {
pm.providers.Remove(k)
res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true,
Prefix: mkProvKey(k),
})
if err != nil {
return err
}
entries, err := res.Rest()
if err != nil {
return err
}
for _, e := range entries {
err := pm.dstore.Delete(ds.NewKey(e.Key))
if err != nil {
log.Error("deleting provider set: ", err)
}
}
return nil
}
func (pm *ProviderManager) getProvKeys() (func() (cid.Cid, bool), error) {
res, err := pm.dstore.Query(dsq.Query{ res, err := pm.dstore.Query(dsq.Query{
KeysOnly: true, KeysOnly: true,
Prefix: providersKeyPrefix, Prefix: providersKeyPrefix,
}) })
if err != nil { if err != nil {
return nil, err log.Error("error garbage collecting provider records: ", err)
return
}
defer res.Close()
now := time.Now()
for {
e, ok := res.NextSync()
if !ok {
return
} }
iter := func() (cid.Cid, bool) { if e.Error != nil {
for e := range res.Next() { log.Error("got an error: ", e.Error)
parts := strings.Split(e.Key, "/")
if len(parts) != 4 {
log.Warningf("incorrectly formatted provider entry in datastore: %s", e.Key)
continue continue
} }
decoded, err := base32.RawStdEncoding.DecodeString(parts[2])
// check expiration time
t, err := readTimeValue(e.Value)
switch {
case err != nil:
// couldn't parse the time
log.Warning("parsing providers record from disk: ", err)
fallthrough
case now.Sub(t) > ProvideValidity:
// or just expired
err = pm.dstore.Delete(ds.RawKey(e.Key))
if err != nil { if err != nil {
log.Warning("error decoding base32 provider key: %s: %s", parts[2], err) log.Warning("failed to remove provider record from disk: ", err)
continue }
}
}
} }
c, err := cid.Cast(decoded) func (pm *ProviderManager) run(proc goprocess.Process) {
if err != nil {
log.Warning("error casting key to cid from datastore key: %s", err)
continue
}
return c, true
}
return cid.Cid{}, false
}
return iter, nil
}
func (pm *ProviderManager) run() {
tick := time.NewTicker(pm.cleanupInterval) tick := time.NewTicker(pm.cleanupInterval)
defer tick.Stop()
for { for {
select { select {
case np := <-pm.newprovs: case np := <-pm.newprovs:
@ -267,47 +247,16 @@ func (pm *ProviderManager) run() {
log.Error("error reading providers: ", err) log.Error("error reading providers: ", err)
} }
gp.resp <- provs // set the cap so the user can't append to this.
gp.resp <- provs[0:len(provs):len(provs)]
case <-tick.C: case <-tick.C:
keys, err := pm.getProvKeys() // You know the wonderful thing about caches? You can
if err != nil { // drop them.
log.Error("Error loading provider keys: ", err) //
continue // Much faster than GCing.
} pm.providers.Purge()
now := time.Now() pm.gc()
for { case <-proc.Closing():
k, ok := keys()
if !ok {
break
}
provs, err := pm.getProvSet(k)
if err != nil {
log.Error("error loading known provset: ", err)
continue
}
for p, t := range provs.set {
if now.Sub(t) > ProvideValidity {
delete(provs.set, p)
}
}
// have we run out of providers?
if len(provs.set) == 0 {
provs.providers = nil
err := pm.deleteProvSet(k)
if err != nil {
log.Error("error deleting provider set: ", err)
}
} else if len(provs.set) < len(provs.providers) {
// We must have modified the providers set, recompute.
provs.providers = make([]peer.ID, 0, len(provs.set))
for p := range provs.set {
provs.providers = append(provs.providers, p)
}
}
}
case <-pm.proc.Closing():
tick.Stop()
return return
} }
} }

View File

@ -10,6 +10,8 @@ import (
cid "github.com/ipfs/go-cid" cid "github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore" ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
u "github.com/ipfs/go-ipfs-util" u "github.com/ipfs/go-ipfs-util"
peer "github.com/libp2p/go-libp2p-peer" peer "github.com/libp2p/go-libp2p-peer"
// //
@ -22,7 +24,7 @@ func TestProviderManager(t *testing.T) {
defer cancel() defer cancel()
mid := peer.ID("testing") mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
a := cid.NewCidV0(u.Hash([]byte("test"))) a := cid.NewCidV0(u.Hash([]byte("test")))
p.AddProvider(ctx, a, peer.ID("testingprovider")) p.AddProvider(ctx, a, peer.ID("testingprovider"))
resp := p.GetProviders(ctx, a) resp := p.GetProviders(ctx, a)
@ -41,7 +43,7 @@ func TestProvidersDatastore(t *testing.T) {
defer cancel() defer cancel()
mid := peer.ID("testing") mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
defer p.proc.Close() defer p.proc.Close()
friend := peer.ID("friend") friend := peer.ID("friend")
@ -64,7 +66,7 @@ func TestProvidersDatastore(t *testing.T) {
} }
func TestProvidersSerialization(t *testing.T) { func TestProvidersSerialization(t *testing.T) {
dstore := ds.NewMapDatastore() dstore := dssync.MutexWrap(ds.NewMapDatastore())
k := cid.NewCidV0(u.Hash(([]byte("my key!")))) k := cid.NewCidV0(u.Hash(([]byte("my key!"))))
p1 := peer.ID("peer one") p1 := peer.ID("peer one")
@ -120,7 +122,7 @@ func TestProvidesExpire(t *testing.T) {
defer cancel() defer cancel()
mid := peer.ID("testing") mid := peer.ID("testing")
p := NewProviderManager(ctx, mid, ds.NewMapDatastore()) p := NewProviderManager(ctx, mid, dssync.MutexWrap(ds.NewMapDatastore()))
peers := []peer.ID{"a", "b"} peers := []peer.ID{"a", "b"}
var cids []cid.Cid var cids []cid.Cid
@ -150,13 +152,15 @@ func TestProvidesExpire(t *testing.T) {
t.Fatal("providers map not cleaned up") t.Fatal("providers map not cleaned up")
} }
proviter, err := p.getProvKeys() res, err := p.dstore.Query(dsq.Query{Prefix: providersKeyPrefix})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
rest, err := res.Rest()
_, ok := proviter() if err != nil {
if ok { t.Fatal(err)
}
if len(rest) > 0 {
t.Fatal("expected everything to be cleaned out of the datastore") t.Fatal("expected everything to be cleaned out of the datastore")
} }
} }
@ -229,7 +233,7 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
p1, p2 := peer.ID("a"), peer.ID("b") p1, p2 := peer.ID("a"), peer.ID("b")
c1 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("1"))) c1 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("1")))
c2 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("2"))) c2 := cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("2")))
pm := NewProviderManager(ctx, p1, ds.NewMapDatastore()) pm := NewProviderManager(ctx, p1, dssync.MutexWrap(ds.NewMapDatastore()))
pm.AddProvider(ctx, c1, p1) pm.AddProvider(ctx, c1, p1)
// make the cached provider for c1 go to datastore // make the cached provider for c1 go to datastore