From dffa2f8b27f804ef3b315d336b46d7d28acdf1b0 Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 13 Mar 2020 23:34:30 +0000 Subject: [PATCH] refactor: ProviderManager (#492) * test: augment TestProviderManager test, add notes of future tests * refactor(provider-manager): order funcs, update names for consistency, add code docs --- providers/provider_set.go | 34 ++ .../{providers.go => providers_manager.go} | 387 +++++++++--------- ...ders_test.go => providers_manager_test.go} | 15 +- 3 files changed, 232 insertions(+), 204 deletions(-) create mode 100644 providers/provider_set.go rename providers/{providers.go => providers_manager.go} (85%) rename providers/{providers_test.go => providers_manager_test.go} (94%) diff --git a/providers/provider_set.go b/providers/provider_set.go new file mode 100644 index 0000000..4b52950 --- /dev/null +++ b/providers/provider_set.go @@ -0,0 +1,34 @@ +package providers + +import ( + "time" + + "github.com/libp2p/go-libp2p-core/peer" +) + +// A providerSet has the list of providers and the time that they were added +// It is used as an intermediary data struct between what is stored in the datastore +// and the list of providers that get passed to the consumer of a .GetProviders call +type providerSet struct { + providers []peer.ID + set map[peer.ID]time.Time +} + +func newProviderSet() *providerSet { + return &providerSet{ + set: make(map[peer.ID]time.Time), + } +} + +func (ps *providerSet) Add(p peer.ID) { + ps.setVal(p, time.Now()) +} + +func (ps *providerSet) setVal(p peer.ID, t time.Time) { + _, found := ps.set[p] + if !found { + ps.providers = append(ps.providers, p) + } + + ps.set[p] = t +} diff --git a/providers/providers.go b/providers/providers_manager.go similarity index 85% rename from providers/providers.go rename to providers/providers_manager.go index 68e2f09..2963f4c 100644 --- a/providers/providers.go +++ b/providers/providers_manager.go @@ -19,19 +19,24 @@ import ( base32 "github.com/multiformats/go-base32" ) -var batchBufferSize = 256 +// ProvidersKeyPrefix is the prefix/namespace for ALL provider record +// keys stored in the data store. +const ProvidersKeyPrefix = "/providers/" -var log = logging.Logger("providers") - -var lruCacheSize = 256 +// ProvideValidity is the default time that a provider record should last var ProvideValidity = time.Hour * 24 var defaultCleanupInterval = time.Hour +var lruCacheSize = 256 +var batchBufferSize = 256 +var log = logging.Logger("providers") +// ProviderManager adds and pulls providers out of the datastore, +// caching them in between type ProviderManager struct { // all non channel fields are meant to be accessed only within // the run method - providers *lru.LRU - dstore *autobatch.Datastore + cache *lru.LRU + dstore *autobatch.Datastore newprovs chan *addProv getprovs chan *getProv @@ -40,21 +45,17 @@ type ProviderManager struct { cleanupInterval time.Duration } -type providerSet struct { - providers []peer.ID - set map[peer.ID]time.Time -} - type addProv struct { - k []byte + key []byte val peer.ID } type getProv struct { - k []byte + key []byte resp chan []peer.ID } +// NewProviderManager constructor func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) *ProviderManager { pm := new(ProviderManager) pm.getprovs = make(chan *getProv) @@ -64,7 +65,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) if err != nil { panic(err) //only happens if negative value is passed to lru constructor } - pm.providers = cache + pm.cache = cache pm.proc = goprocessctx.WithContext(ctx) pm.cleanupInterval = defaultCleanupInterval @@ -73,45 +74,203 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching) return pm } -// ProvidersKeyPrefix is the prefix/namespace for ALL provider record -// keys stored in the data store. -const ProvidersKeyPrefix = "/providers/" +// Process returns the ProviderManager process +func (pm *ProviderManager) Process() goprocess.Process { + return pm.proc +} + +func (pm *ProviderManager) run(proc goprocess.Process) { + var ( + gcQuery dsq.Results + gcQueryRes <-chan dsq.Result + gcSkip map[string]struct{} + gcTime time.Time + gcTimer = time.NewTimer(pm.cleanupInterval) + ) + + defer func() { + gcTimer.Stop() + if gcQuery != nil { + // don't really care if this fails. + _ = gcQuery.Close() + } + if err := pm.dstore.Flush(); err != nil { + log.Error("failed to flush datastore: ", err) + } + }() + + for { + select { + case np := <-pm.newprovs: + err := pm.addProv(np.key, np.val) + if err != nil { + log.Error("error adding new providers: ", err) + continue + } + if gcSkip != nil { + // we have an gc, tell it to skip this provider + // as we've updated it since the GC started. + gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{} + } + case gp := <-pm.getprovs: + provs, err := pm.getProvidersForKey(gp.key) + if err != nil && err != ds.ErrNotFound { + log.Error("error reading providers: ", err) + } + + // set the cap so the user can't append to this. + gp.resp <- provs[0:len(provs):len(provs)] + case res, ok := <-gcQueryRes: + if !ok { + if err := gcQuery.Close(); err != nil { + log.Error("failed to close provider GC query: ", err) + } + gcTimer.Reset(pm.cleanupInterval) + + // cleanup GC round + gcQueryRes = nil + gcSkip = nil + gcQuery = nil + continue + } + if res.Error != nil { + log.Error("got error from GC query: ", res.Error) + continue + } + if _, ok := gcSkip[res.Key]; ok { + // We've updated this record since starting the + // GC round, skip it. + continue + } + + // check expiration time + t, err := readTimeValue(res.Value) + switch { + case err != nil: + // couldn't parse the time + log.Error("parsing providers record from disk: ", err) + fallthrough + case gcTime.Sub(t) > ProvideValidity: + // or expired + err = pm.dstore.Delete(ds.RawKey(res.Key)) + if err != nil && err != ds.ErrNotFound { + log.Error("failed to remove provider record from disk: ", err) + } + } + + case gcTime = <-gcTimer.C: + // You know the wonderful thing about caches? You can + // drop them. + // + // Much faster than GCing. + pm.cache.Purge() + + // Now, kick off a GC of the datastore. + q, err := pm.dstore.Query(dsq.Query{ + Prefix: ProvidersKeyPrefix, + }) + if err != nil { + log.Error("provider record GC query failed: ", err) + continue + } + gcQuery = q + gcQueryRes = q.Next() + gcSkip = make(map[string]struct{}) + case <-proc.Closing(): + return + } + } +} + +// AddProvider adds a provider +func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) { + prov := &addProv{ + key: k, + val: val, + } + select { + case pm.newprovs <- prov: + case <-ctx.Done(): + } +} + +// addProv updates the cache if needed +func (pm *ProviderManager) addProv(k []byte, p peer.ID) error { + now := time.Now() + if provs, ok := pm.cache.Get(string(k)); ok { + provs.(*providerSet).setVal(p, now) + } // else not cached, just write through + + return writeProviderEntry(pm.dstore, k, p, now) +} + +// writeProviderEntry writes the provider into the datastore +func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error { + dsk := mkProvKeyFor(k, p) + + buf := make([]byte, 16) + n := binary.PutVarint(buf, t.UnixNano()) + + return dstore.Put(ds.NewKey(dsk), buf[:n]) +} + +func mkProvKeyFor(k []byte, p peer.ID) string { + return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p)) +} func mkProvKey(k []byte) string { return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k) } -func (pm *ProviderManager) Process() goprocess.Process { - return pm.proc +// GetProviders returns the set of providers for the given key. +// This method _does not_ copy the set. Do not modify it. +func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID { + gp := &getProv{ + key: k, + resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking + } + select { + case <-ctx.Done(): + return nil + case pm.getprovs <- gp: + } + select { + case <-ctx.Done(): + return nil + case peers := <-gp.resp: + return peers + } } -func (pm *ProviderManager) providersForKey(k []byte) ([]peer.ID, error) { - pset, err := pm.getProvSet(k) +func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) { + pset, err := pm.getProviderSetForKey(k) if err != nil { return nil, err } return pset.providers, nil } -func (pm *ProviderManager) getProvSet(k []byte) (*providerSet, error) { - cached, ok := pm.providers.Get(string(k)) +// returns the ProviderSet if it already exists on cache, otherwise loads it from datasatore +func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error) { + cached, ok := pm.cache.Get(string(k)) if ok { return cached.(*providerSet), nil } - pset, err := loadProvSet(pm.dstore, k) + pset, err := loadProviderSet(pm.dstore, k) if err != nil { return nil, err } if len(pset.providers) > 0 { - pm.providers.Add(string(k), pset) + pm.cache.Add(string(k), pset) } return pset, nil } -func loadProvSet(dstore ds.Datastore, k []byte) (*providerSet, error) { +// loads the ProviderSet out of the datastore +func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) { res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)}) if err != nil { return nil, err @@ -174,179 +333,3 @@ func readTimeValue(data []byte) (time.Time, error) { return time.Unix(0, nsec), nil } - -func (pm *ProviderManager) addProv(k []byte, p peer.ID) error { - now := time.Now() - if provs, ok := pm.providers.Get(string(k)); ok { - provs.(*providerSet).setVal(p, now) - } // else not cached, just write through - - return writeProviderEntry(pm.dstore, k, p, now) -} - -func mkProvKeyFor(k []byte, p peer.ID) string { - return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p)) -} - -func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error { - dsk := mkProvKeyFor(k, p) - - buf := make([]byte, 16) - n := binary.PutVarint(buf, t.UnixNano()) - - return dstore.Put(ds.NewKey(dsk), buf[:n]) -} - -func (pm *ProviderManager) run(proc goprocess.Process) { - var ( - gcQuery dsq.Results - gcQueryRes <-chan dsq.Result - gcSkip map[string]struct{} - gcTime time.Time - gcTimer = time.NewTimer(pm.cleanupInterval) - ) - - defer func() { - gcTimer.Stop() - if gcQuery != nil { - // don't really care if this fails. - _ = gcQuery.Close() - } - if err := pm.dstore.Flush(); err != nil { - log.Error("failed to flush datastore: ", err) - } - }() - - for { - select { - case np := <-pm.newprovs: - err := pm.addProv(np.k, np.val) - if err != nil { - log.Error("error adding new providers: ", err) - continue - } - if gcSkip != nil { - // we have an gc, tell it to skip this provider - // as we've updated it since the GC started. - gcSkip[mkProvKeyFor(np.k, np.val)] = struct{}{} - } - case gp := <-pm.getprovs: - provs, err := pm.providersForKey(gp.k) - if err != nil && err != ds.ErrNotFound { - log.Error("error reading providers: ", err) - } - - // set the cap so the user can't append to this. - gp.resp <- provs[0:len(provs):len(provs)] - case res, ok := <-gcQueryRes: - if !ok { - if err := gcQuery.Close(); err != nil { - log.Error("failed to close provider GC query: ", err) - } - gcTimer.Reset(pm.cleanupInterval) - - // cleanup GC round - gcQueryRes = nil - gcSkip = nil - gcQuery = nil - continue - } - if res.Error != nil { - log.Error("got error from GC query: ", res.Error) - continue - } - if _, ok := gcSkip[res.Key]; ok { - // We've updated this record since starting the - // GC round, skip it. - continue - } - - // check expiration time - t, err := readTimeValue(res.Value) - switch { - case err != nil: - // couldn't parse the time - log.Error("parsing providers record from disk: ", err) - fallthrough - case gcTime.Sub(t) > ProvideValidity: - // or expired - err = pm.dstore.Delete(ds.RawKey(res.Key)) - if err != nil && err != ds.ErrNotFound { - log.Error("failed to remove provider record from disk: ", err) - } - } - - case gcTime = <-gcTimer.C: - // You know the wonderful thing about caches? You can - // drop them. - // - // Much faster than GCing. - pm.providers.Purge() - - // Now, kick off a GC of the datastore. - q, err := pm.dstore.Query(dsq.Query{ - Prefix: ProvidersKeyPrefix, - }) - if err != nil { - log.Error("provider record GC query failed: ", err) - continue - } - gcQuery = q - gcQueryRes = q.Next() - gcSkip = make(map[string]struct{}) - case <-proc.Closing(): - return - } - } -} - -// AddProvider adds a provider. -func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) { - prov := &addProv{ - k: k, - val: val, - } - select { - case pm.newprovs <- prov: - case <-ctx.Done(): - } -} - -// GetProviders returns the set of providers for the given key. -// This method _does not_ copy the set. Do not modify it. -func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID { - gp := &getProv{ - k: k, - resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking - } - select { - case <-ctx.Done(): - return nil - case pm.getprovs <- gp: - } - select { - case <-ctx.Done(): - return nil - case peers := <-gp.resp: - return peers - } -} - -func newProviderSet() *providerSet { - return &providerSet{ - set: make(map[peer.ID]time.Time), - } -} - -func (ps *providerSet) Add(p peer.ID) { - ps.setVal(p, time.Now()) -} - -func (ps *providerSet) setVal(p peer.ID, t time.Time) { - _, found := ps.set[p] - if !found { - ps.providers = append(ps.providers, p) - } - - ps.set[p] = t -} diff --git a/providers/providers_test.go b/providers/providers_manager_test.go similarity index 94% rename from providers/providers_test.go rename to providers/providers_manager_test.go index 56a3345..108ee87 100644 --- a/providers/providers_test.go +++ b/providers/providers_manager_test.go @@ -31,16 +31,27 @@ func TestProviderManager(t *testing.T) { p.AddProvider(ctx, a, peer.ID("testingprovider")) // Not cached + // TODO verify that cache is empty resp := p.GetProviders(ctx, a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } // Cached + // TODO verify that cache is populated resp = p.GetProviders(ctx, a) if len(resp) != 1 { t.Fatal("Could not retrieve provider.") } + + p.AddProvider(ctx, a, peer.ID("testingprovider2")) + p.AddProvider(ctx, a, peer.ID("testingprovider3")) + // TODO verify that cache is already up to date + resp = p.GetProviders(ctx, a) + if len(resp) != 3 { + t.Fatalf("Should have got 3 providers, got %d", len(resp)) + } + p.proc.Close() } @@ -94,7 +105,7 @@ func TestProvidersSerialization(t *testing.T) { t.Fatal(err) } - pset, err := loadProvSet(dstore, k) + pset, err := loadProviderSet(dstore, k) if err != nil { t.Fatal(err) } @@ -182,7 +193,7 @@ func TestProvidesExpire(t *testing.T) { // Stop to prevent data races p.Process().Close() - if p.providers.Len() != 0 { + if p.cache.Len() != 0 { t.Fatal("providers map not cleaned up") }