mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
feat: Expose the ProviderManager through the IpfsDht struct (#491)
* feat: Expose the ProviderManager through the IpfsDht struct * feat: expose providers prefix so that we do not have to hard code it in other places Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
This commit is contained in:
parent
6e748c20d8
commit
ad27ebcfa7
7
dht.go
7
dht.go
@ -49,7 +49,8 @@ type IpfsDHT struct {
|
|||||||
datastore ds.Datastore // Local data
|
datastore ds.Datastore // Local data
|
||||||
|
|
||||||
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
|
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
|
||||||
providers *providers.ProviderManager
|
// ProviderManager stores & manages the provider records for this Dht peer.
|
||||||
|
ProviderManager *providers.ProviderManager
|
||||||
|
|
||||||
birth time.Time // When this peer started up
|
birth time.Time // When this peer started up
|
||||||
|
|
||||||
@ -111,7 +112,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
|
|||||||
// register for network notifs.
|
// register for network notifs.
|
||||||
dht.host.Network().Notify((*netNotifiee)(dht))
|
dht.host.Network().Notify((*netNotifiee)(dht))
|
||||||
|
|
||||||
dht.proc.AddChild(dht.providers.Process())
|
dht.proc.AddChild(dht.ProviderManager.Process())
|
||||||
dht.Validator = cfg.Validator
|
dht.Validator = cfg.Validator
|
||||||
|
|
||||||
if !cfg.Client {
|
if !cfg.Client {
|
||||||
@ -185,7 +186,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg *opts.Options) *IpfsDHT {
|
|||||||
// the DHT context should be done when the process is closed
|
// the DHT context should be done when the process is closed
|
||||||
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)
|
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)
|
||||||
|
|
||||||
dht.providers = providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore)
|
dht.ProviderManager = providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore)
|
||||||
|
|
||||||
return dht
|
return dht
|
||||||
}
|
}
|
||||||
|
10
dht_test.go
10
dht_test.go
@ -30,7 +30,7 @@ import (
|
|||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
u "github.com/ipfs/go-ipfs-util"
|
u "github.com/ipfs/go-ipfs-util"
|
||||||
kb "github.com/libp2p/go-libp2p-kbucket"
|
kb "github.com/libp2p/go-libp2p-kbucket"
|
||||||
"github.com/libp2p/go-libp2p-record"
|
record "github.com/libp2p/go-libp2p-record"
|
||||||
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
|
swarmt "github.com/libp2p/go-libp2p-swarm/testing"
|
||||||
"github.com/libp2p/go-libp2p-testing/ci"
|
"github.com/libp2p/go-libp2p-testing/ci"
|
||||||
travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
|
travisci "github.com/libp2p/go-libp2p-testing/ci/travis"
|
||||||
@ -677,7 +677,7 @@ func TestLocalProvides(t *testing.T) {
|
|||||||
|
|
||||||
for _, c := range testCaseCids {
|
for _, c := range testCaseCids {
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
provs := dhts[i].providers.GetProviders(ctx, c.Hash())
|
provs := dhts[i].ProviderManager.GetProviders(ctx, c.Hash())
|
||||||
if len(provs) > 0 {
|
if len(provs) > 0 {
|
||||||
t.Fatal("shouldnt know this")
|
t.Fatal("shouldnt know this")
|
||||||
}
|
}
|
||||||
@ -1418,7 +1418,7 @@ func TestClientModeConnect(t *testing.T) {
|
|||||||
|
|
||||||
c := testCaseCids[0]
|
c := testCaseCids[0]
|
||||||
p := peer.ID("TestPeer")
|
p := peer.ID("TestPeer")
|
||||||
a.providers.AddProvider(ctx, c.Hash(), p)
|
a.ProviderManager.AddProvider(ctx, c.Hash(), p)
|
||||||
time.Sleep(time.Millisecond * 5) // just in case...
|
time.Sleep(time.Millisecond * 5) // just in case...
|
||||||
|
|
||||||
provs, err := b.FindProviders(ctx, c)
|
provs, err := b.FindProviders(ctx, c)
|
||||||
@ -1633,7 +1633,7 @@ func TestProvideDisabled(t *testing.T) {
|
|||||||
if err != routing.ErrNotSupported {
|
if err != routing.ErrNotSupported {
|
||||||
t.Fatal("get should have failed on node B")
|
t.Fatal("get should have failed on node B")
|
||||||
}
|
}
|
||||||
provs := dhtB.providers.GetProviders(ctx, kHash)
|
provs := dhtB.ProviderManager.GetProviders(ctx, kHash)
|
||||||
if len(provs) != 0 {
|
if len(provs) != 0 {
|
||||||
t.Fatal("node B should not have found local providers")
|
t.Fatal("node B should not have found local providers")
|
||||||
}
|
}
|
||||||
@ -1649,7 +1649,7 @@ func TestProvideDisabled(t *testing.T) {
|
|||||||
t.Fatal("node A should not have found providers")
|
t.Fatal("node A should not have found providers")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
provAddrs := dhtA.providers.GetProviders(ctx, kHash)
|
provAddrs := dhtA.ProviderManager.GetProviders(ctx, kHash)
|
||||||
if len(provAddrs) != 0 {
|
if len(provAddrs) != 0 {
|
||||||
t.Fatal("node A should not have found local providers")
|
t.Fatal("node A should not have found local providers")
|
||||||
}
|
}
|
||||||
|
@ -335,7 +335,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
|
|||||||
}
|
}
|
||||||
|
|
||||||
// setup providers
|
// setup providers
|
||||||
providers := dht.providers.GetProviders(ctx, key)
|
providers := dht.ProviderManager.GetProviders(ctx, key)
|
||||||
if has {
|
if has {
|
||||||
providers = append(providers, dht.self)
|
providers = append(providers, dht.self)
|
||||||
logger.Debugf("%s have the value. added self as provider", reqDesc)
|
logger.Debugf("%s have the value. added self as provider", reqDesc)
|
||||||
@ -393,7 +393,7 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
|
|||||||
// add the received addresses to our peerstore.
|
// add the received addresses to our peerstore.
|
||||||
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
|
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
|
||||||
}
|
}
|
||||||
dht.providers.AddProvider(ctx, key, p)
|
dht.ProviderManager.AddProvider(ctx, key, p)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -73,10 +73,12 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
|
|||||||
return pm
|
return pm
|
||||||
}
|
}
|
||||||
|
|
||||||
const providersKeyPrefix = "/providers/"
|
// ProvidersKeyPrefix is the prefix/namespace for ALL provider record
|
||||||
|
// keys stored in the data store.
|
||||||
|
const ProvidersKeyPrefix = "/providers/"
|
||||||
|
|
||||||
func mkProvKey(k []byte) string {
|
func mkProvKey(k []byte) string {
|
||||||
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
|
return ProvidersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProviderManager) Process() goprocess.Process {
|
func (pm *ProviderManager) Process() goprocess.Process {
|
||||||
@ -283,7 +285,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
|
|||||||
|
|
||||||
// Now, kick off a GC of the datastore.
|
// Now, kick off a GC of the datastore.
|
||||||
q, err := pm.dstore.Query(dsq.Query{
|
q, err := pm.dstore.Query(dsq.Query{
|
||||||
Prefix: providersKeyPrefix,
|
Prefix: ProvidersKeyPrefix,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("provider record GC query failed: ", err)
|
log.Error("provider record GC query failed: ", err)
|
||||||
|
@ -186,7 +186,7 @@ func TestProvidesExpire(t *testing.T) {
|
|||||||
t.Fatal("providers map not cleaned up")
|
t.Fatal("providers map not cleaned up")
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := ds.Query(dsq.Query{Prefix: providersKeyPrefix})
|
res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -426,7 +426,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// add self locally
|
// add self locally
|
||||||
dht.providers.AddProvider(ctx, keyMH, dht.self)
|
dht.ProviderManager.AddProvider(ctx, keyMH, dht.self)
|
||||||
if !brdcst {
|
if !brdcst {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -528,7 +528,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
|
|||||||
defer close(peerOut)
|
defer close(peerOut)
|
||||||
|
|
||||||
ps := peer.NewLimitedSet(count)
|
ps := peer.NewLimitedSet(count)
|
||||||
provs := dht.providers.GetProviders(ctx, key)
|
provs := dht.ProviderManager.GetProviders(ctx, key)
|
||||||
for _, p := range provs {
|
for _, p := range provs {
|
||||||
// NOTE: Assuming that this list of peers is unique
|
// NOTE: Assuming that this list of peers is unique
|
||||||
if ps.TryAdd(p) {
|
if ps.TryAdd(p) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user