mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
gx update and fix code to use new Cid type
This commit is contained in:
parent
ac1ca48cc5
commit
a303c45371
2
dht.go
2
dht.go
@ -306,7 +306,7 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
|
||||
}
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid.Cid) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
|
||||
eip := log.EventBegin(ctx, "findProvidersSingle", p, key)
|
||||
defer eip.Done()
|
||||
|
||||
|
@ -30,7 +30,7 @@ import (
|
||||
)
|
||||
|
||||
var testCaseValues = map[string][]byte{}
|
||||
var testCaseCids []*cid.Cid
|
||||
var testCaseCids []cid.Cid
|
||||
|
||||
func init() {
|
||||
for i := 0; i < 100; i++ {
|
||||
@ -794,7 +794,7 @@ func TestProvidesMany(t *testing.T) {
|
||||
defer cancel()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
getProvider := func(dht *IpfsDHT, k *cid.Cid) {
|
||||
getProvider := func(dht *IpfsDHT, k cid.Cid) {
|
||||
defer wg.Done()
|
||||
|
||||
expected := providers[k.KeyString()]
|
||||
|
@ -84,9 +84,9 @@
|
||||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
"hash": "QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU",
|
||||
"hash": "QmdKS5YtmuSWKuLLgbHG176mS3VX3AKiyVmaaiAfvgcuch",
|
||||
"name": "go-libp2p-routing",
|
||||
"version": "2.4.10"
|
||||
"version": "2.5.0"
|
||||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
@ -96,9 +96,9 @@
|
||||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
"hash": "QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb",
|
||||
"hash": "QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7",
|
||||
"name": "go-cid",
|
||||
"version": "0.8.0"
|
||||
"version": "0.9.0"
|
||||
},
|
||||
{
|
||||
"author": "whyrusleeping",
|
||||
|
@ -48,12 +48,12 @@ type providerSet struct {
|
||||
}
|
||||
|
||||
type addProv struct {
|
||||
k *cid.Cid
|
||||
k cid.Cid
|
||||
val peer.ID
|
||||
}
|
||||
|
||||
type getProv struct {
|
||||
k *cid.Cid
|
||||
k cid.Cid
|
||||
resp chan []peer.ID
|
||||
}
|
||||
|
||||
@ -77,7 +77,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
|
||||
|
||||
const providersKeyPrefix = "/providers/"
|
||||
|
||||
func mkProvKey(k *cid.Cid) string {
|
||||
func mkProvKey(k cid.Cid) string {
|
||||
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k.Bytes())
|
||||
}
|
||||
|
||||
@ -85,7 +85,7 @@ func (pm *ProviderManager) Process() goprocess.Process {
|
||||
return pm.proc
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) providersForKey(k *cid.Cid) ([]peer.ID, error) {
|
||||
func (pm *ProviderManager) providersForKey(k cid.Cid) ([]peer.ID, error) {
|
||||
pset, err := pm.getProvSet(k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -93,7 +93,7 @@ func (pm *ProviderManager) providersForKey(k *cid.Cid) ([]peer.ID, error) {
|
||||
return pset.providers, nil
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) getProvSet(k *cid.Cid) (*providerSet, error) {
|
||||
func (pm *ProviderManager) getProvSet(k cid.Cid) (*providerSet, error) {
|
||||
cached, ok := pm.providers.Get(k.KeyString())
|
||||
if ok {
|
||||
return cached.(*providerSet), nil
|
||||
@ -111,7 +111,7 @@ func (pm *ProviderManager) getProvSet(k *cid.Cid) (*providerSet, error) {
|
||||
return pset, nil
|
||||
}
|
||||
|
||||
func loadProvSet(dstore ds.Datastore, k *cid.Cid) (*providerSet, error) {
|
||||
func loadProvSet(dstore ds.Datastore, k cid.Cid) (*providerSet, error) {
|
||||
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -161,7 +161,7 @@ func readTimeValue(i interface{}) (time.Time, error) {
|
||||
return time.Unix(0, nsec), nil
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) addProv(k *cid.Cid, p peer.ID) error {
|
||||
func (pm *ProviderManager) addProv(k cid.Cid, p peer.ID) error {
|
||||
iprovs, ok := pm.providers.Get(k.KeyString())
|
||||
if !ok {
|
||||
stored, err := loadProvSet(pm.dstore, k)
|
||||
@ -178,7 +178,7 @@ func (pm *ProviderManager) addProv(k *cid.Cid, p peer.ID) error {
|
||||
return writeProviderEntry(pm.dstore, k, p, now)
|
||||
}
|
||||
|
||||
func writeProviderEntry(dstore ds.Datastore, k *cid.Cid, p peer.ID, t time.Time) error {
|
||||
func writeProviderEntry(dstore ds.Datastore, k cid.Cid, p peer.ID, t time.Time) error {
|
||||
dsk := mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
|
||||
|
||||
buf := make([]byte, 16)
|
||||
@ -187,7 +187,7 @@ func writeProviderEntry(dstore ds.Datastore, k *cid.Cid, p peer.ID, t time.Time)
|
||||
return dstore.Put(ds.NewKey(dsk), buf[:n])
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) deleteProvSet(k *cid.Cid) error {
|
||||
func (pm *ProviderManager) deleteProvSet(k cid.Cid) error {
|
||||
pm.providers.Remove(k.KeyString())
|
||||
|
||||
res, err := pm.dstore.Query(dsq.Query{
|
||||
@ -212,7 +212,7 @@ func (pm *ProviderManager) deleteProvSet(k *cid.Cid) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) getProvKeys() (func() (*cid.Cid, bool), error) {
|
||||
func (pm *ProviderManager) getProvKeys() (func() (cid.Cid, bool), error) {
|
||||
res, err := pm.dstore.Query(dsq.Query{
|
||||
KeysOnly: true,
|
||||
Prefix: providersKeyPrefix,
|
||||
@ -221,7 +221,7 @@ func (pm *ProviderManager) getProvKeys() (func() (*cid.Cid, bool), error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
iter := func() (*cid.Cid, bool) {
|
||||
iter := func() (cid.Cid, bool) {
|
||||
for e := range res.Next() {
|
||||
parts := strings.Split(e.Key, "/")
|
||||
if len(parts) != 4 {
|
||||
@ -242,7 +242,7 @@ func (pm *ProviderManager) getProvKeys() (func() (*cid.Cid, bool), error) {
|
||||
|
||||
return c, true
|
||||
}
|
||||
return nil, false
|
||||
return cid.Cid{}, false
|
||||
}
|
||||
|
||||
return iter, nil
|
||||
@ -309,7 +309,7 @@ func (pm *ProviderManager) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) AddProvider(ctx context.Context, k *cid.Cid, val peer.ID) {
|
||||
func (pm *ProviderManager) AddProvider(ctx context.Context, k cid.Cid, val peer.ID) {
|
||||
prov := &addProv{
|
||||
k: k,
|
||||
val: val,
|
||||
@ -320,7 +320,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k *cid.Cid, val peer
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) GetProviders(ctx context.Context, k *cid.Cid) []peer.ID {
|
||||
func (pm *ProviderManager) GetProviders(ctx context.Context, k cid.Cid) []peer.ID {
|
||||
gp := &getProv{
|
||||
k: k,
|
||||
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
|
||||
|
@ -45,7 +45,7 @@ func TestProvidersDatastore(t *testing.T) {
|
||||
defer p.proc.Close()
|
||||
|
||||
friend := peer.ID("friend")
|
||||
var cids []*cid.Cid
|
||||
var cids []cid.Cid
|
||||
for i := 0; i < 100; i++ {
|
||||
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
|
||||
cids = append(cids, c)
|
||||
@ -123,7 +123,7 @@ func TestProvidesExpire(t *testing.T) {
|
||||
p := NewProviderManager(ctx, mid, ds.NewMapDatastore())
|
||||
|
||||
peers := []peer.ID{"a", "b"}
|
||||
var cids []*cid.Cid
|
||||
var cids []cid.Cid
|
||||
for i := 0; i < 10; i++ {
|
||||
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
|
||||
cids = append(cids, c)
|
||||
@ -199,7 +199,7 @@ func TestLargeProvidersSet(t *testing.T) {
|
||||
p := NewProviderManager(ctx, mid, lds)
|
||||
defer p.proc.Close()
|
||||
|
||||
var cids []*cid.Cid
|
||||
var cids []cid.Cid
|
||||
for i := 0; i < 1000; i++ {
|
||||
c := cid.NewCidV0(u.Hash([]byte(fmt.Sprint(i))))
|
||||
cids = append(cids, c)
|
||||
|
10
routing.go
10
routing.go
@ -395,7 +395,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha
|
||||
// locations of the value, similarly to Coral and Mainline DHT.
|
||||
|
||||
// Provide makes this node announce that it can provide a value for the given key
|
||||
func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err error) {
|
||||
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
|
||||
eip := log.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
|
||||
defer func() {
|
||||
if err != nil {
|
||||
@ -435,7 +435,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key *cid.Cid, brdcst bool) (err
|
||||
wg.Wait()
|
||||
return nil
|
||||
}
|
||||
func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) makeProvRecord(skey cid.Cid) (*pb.Message, error) {
|
||||
pi := pstore.PeerInfo{
|
||||
ID: dht.self,
|
||||
Addrs: dht.host.Addrs(),
|
||||
@ -453,7 +453,7 @@ func (dht *IpfsDHT) makeProvRecord(skey *cid.Cid) (*pb.Message, error) {
|
||||
}
|
||||
|
||||
// FindProviders searches until the context expires.
|
||||
func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.PeerInfo, error) {
|
||||
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]pstore.PeerInfo, error) {
|
||||
var providers []pstore.PeerInfo
|
||||
for p := range dht.FindProvidersAsync(ctx, c, KValue) {
|
||||
providers = append(providers, p)
|
||||
@ -464,14 +464,14 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c *cid.Cid) ([]pstore.Pee
|
||||
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
|
||||
// Peers will be returned on the channel as soon as they are found, even before
|
||||
// the search query completes.
|
||||
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key *cid.Cid, count int) <-chan pstore.PeerInfo {
|
||||
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan pstore.PeerInfo {
|
||||
log.Event(ctx, "findProviders", key)
|
||||
peerOut := make(chan pstore.PeerInfo, count)
|
||||
go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
|
||||
return peerOut
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key *cid.Cid, count int, peerOut chan pstore.PeerInfo) {
|
||||
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan pstore.PeerInfo) {
|
||||
defer log.EventBegin(ctx, "findProvidersAsync", key).Done()
|
||||
defer close(peerOut)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user