provider record keys can be an arbitrary byte array less than 80 bytes instead of only a multihash

This commit is contained in:
Adin Schmahmann 2019-12-12 16:02:22 -05:00
parent a4b38eebee
commit 3e24f352aa
4 changed files with 46 additions and 53 deletions

10
dht.go
View File

@ -374,18 +374,18 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
}
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, keyCid cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, keyCid)
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key cid.Cid) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, key)
defer eip.Done()
key := keyCid.Hash()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
keyMH := key.Hash()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, keyMH, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
logger.Warningf("read timeout: %s %s", p.Pretty(), key)
logger.Warningf("read timeout: %s %s", p.Pretty(), keyMH)
fallthrough
default:
eip.SetError(err)

View File

@ -12,8 +12,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
pstore "github.com/libp2p/go-libp2p-peerstore"
mh "github.com/multiformats/go-multihash"
"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
u "github.com/ipfs/go-ipfs-util"
@ -318,26 +316,26 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
logger.SetTag(ctx, "peer", p)
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
_, h, err := mh.MHFromBytes(pmes.GetKey())
if err != nil {
return nil, err
key := pmes.GetKey()
if len(key) > 80 {
return nil, fmt.Errorf("handleGetProviders key size too large")
}
logger.SetTag(ctx, "key", h)
logger.SetTag(ctx, "key", key)
// debug logging niceness.
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, h)
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
logger.Debugf("%s begin", reqDesc)
defer logger.Debugf("%s end", reqDesc)
// check if we have this value, to add ourselves as provider.
has, err := dht.datastore.Has(convertToDsKey(h))
has, err := dht.datastore.Has(convertToDsKey(key))
if err != nil && err != ds.ErrNotFound {
logger.Debugf("unexpected datastore error: %v\n", err)
has = false
}
// setup providers
providers := dht.providers.GetProviders(ctx, h)
providers := dht.providers.GetProviders(ctx, key)
if has {
providers = append(providers, dht.self)
logger.Debugf("%s have the value. added self as provider", reqDesc)
@ -367,13 +365,13 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
defer func() { logger.FinishWithErr(ctx, _err) }()
logger.SetTag(ctx, "peer", p)
_, h, err := mh.MHFromBytes(pmes.GetKey())
if err != nil {
return nil, err
key := pmes.GetKey()
if len(key) > 80 {
return nil, fmt.Errorf("handleAddProviders key size too large")
}
logger.SetTag(ctx, "key", h)
logger.SetTag(ctx, "key", key)
logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, h)
logger.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)
// add provider should use the address given in the message
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
@ -390,12 +388,12 @@ func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.M
continue
}
logger.Debugf("received provider %s for %s (addrs: %s)", p, h, pi.Addrs)
logger.Debugf("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
if pi.ID != dht.self { // don't add own addrs.
// add the received addresses to our peerstore.
dht.peerstore.AddAddrs(pi.ID, pi.Addrs, peerstore.ProviderAddrTTL)
}
dht.providers.AddProvider(ctx, h, p)
dht.providers.AddProvider(ctx, key, p)
}
return nil, nil

View File

@ -9,8 +9,6 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
mh "github.com/multiformats/go-multihash"
lru "github.com/hashicorp/golang-lru/simplelru"
ds "github.com/ipfs/go-datastore"
autobatch "github.com/ipfs/go-datastore/autobatch"
@ -48,12 +46,12 @@ type providerSet struct {
}
type addProv struct {
k mh.Multihash
k []byte
val peer.ID
}
type getProv struct {
k mh.Multihash
k []byte
resp chan []peer.ID
}
@ -77,7 +75,7 @@ func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching)
const providersKeyPrefix = "/providers/"
func mkProvKey(k mh.Multihash) string {
func mkProvKey(k []byte) string {
return providersKeyPrefix + base32.RawStdEncoding.EncodeToString(k)
}
@ -85,7 +83,7 @@ func (pm *ProviderManager) Process() goprocess.Process {
return pm.proc
}
func (pm *ProviderManager) providersForKey(k mh.Multihash) ([]peer.ID, error) {
func (pm *ProviderManager) providersForKey(k []byte) ([]peer.ID, error) {
pset, err := pm.getProvSet(k)
if err != nil {
return nil, err
@ -93,7 +91,7 @@ func (pm *ProviderManager) providersForKey(k mh.Multihash) ([]peer.ID, error) {
return pset.providers, nil
}
func (pm *ProviderManager) getProvSet(k mh.Multihash) (*providerSet, error) {
func (pm *ProviderManager) getProvSet(k []byte) (*providerSet, error) {
cached, ok := pm.providers.Get(string(k))
if ok {
return cached.(*providerSet), nil
@ -111,7 +109,7 @@ func (pm *ProviderManager) getProvSet(k mh.Multihash) (*providerSet, error) {
return pset, nil
}
func loadProvSet(dstore ds.Datastore, k mh.Multihash) (*providerSet, error) {
func loadProvSet(dstore ds.Datastore, k []byte) (*providerSet, error) {
res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)})
if err != nil {
return nil, err
@ -175,7 +173,7 @@ func readTimeValue(data []byte) (time.Time, error) {
return time.Unix(0, nsec), nil
}
func (pm *ProviderManager) addProv(k mh.Multihash, p peer.ID) error {
func (pm *ProviderManager) addProv(k []byte, p peer.ID) error {
now := time.Now()
if provs, ok := pm.providers.Get(string(k)); ok {
provs.(*providerSet).setVal(p, now)
@ -184,11 +182,11 @@ func (pm *ProviderManager) addProv(k mh.Multihash, p peer.ID) error {
return writeProviderEntry(pm.dstore, k, p, now)
}
func mkProvKeyFor(k mh.Multihash, p peer.ID) string {
func mkProvKeyFor(k []byte, p peer.ID) string {
return mkProvKey(k) + "/" + base32.RawStdEncoding.EncodeToString([]byte(p))
}
func writeProviderEntry(dstore ds.Datastore, k mh.Multihash, p peer.ID, t time.Time) error {
func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error {
dsk := mkProvKeyFor(k, p)
buf := make([]byte, 16)
@ -301,7 +299,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) {
}
// AddProvider adds a provider.
func (pm *ProviderManager) AddProvider(ctx context.Context, k mh.Multihash, val peer.ID) {
func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.ID) {
prov := &addProv{
k: k,
val: val,
@ -314,7 +312,7 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k mh.Multihash, val
// GetProviders returns the set of providers for the given key.
// This method _does not_ copy the set. Do not modify it.
func (pm *ProviderManager) GetProviders(ctx context.Context, k mh.Multihash) []peer.ID {
func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID {
gp := &getProv{
k: k,
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking

View File

@ -13,8 +13,6 @@ import (
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
mh "github.com/multiformats/go-multihash"
"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
@ -413,12 +411,12 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha
// locations of the value, similarly to Coral and Mainline DHT.
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (err error) {
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
if !dht.enableProviders {
return routing.ErrNotSupported
}
eip := logger.EventBegin(ctx, "Provide", keyCid, logging.LoggableMap{"broadcast": brdcst})
key := keyCid.Hash()
eip := logger.EventBegin(ctx, "Provide", key, logging.LoggableMap{"broadcast": brdcst})
keyMH := key.Hash()
defer func() {
if err != nil {
eip.SetError(err)
@ -427,7 +425,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e
}()
// add self locally
dht.providers.AddProvider(ctx, key, dht.self)
dht.providers.AddProvider(ctx, keyMH, dht.self)
if !brdcst {
return nil
}
@ -453,12 +451,12 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e
defer cancel()
}
peers, err := dht.GetClosestPeers(closerCtx, string(key))
peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
if err != nil {
return err
}
mes, err := dht.makeProvRecord(key)
mes, err := dht.makeProvRecord(keyMH)
if err != nil {
return err
}
@ -468,7 +466,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
logger.Debugf("putProvider(%s, %s)", key, p)
logger.Debugf("putProvider(%s, %s)", keyMH, p)
err := dht.sendMessage(ctx, p, mes)
if err != nil {
logger.Debug(err)
@ -478,7 +476,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, keyCid cid.Cid, brdcst bool) (e
wg.Wait()
return nil
}
func (dht *IpfsDHT) makeProvRecord(key mh.Multihash) (*pb.Message, error) {
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
pi := peer.AddrInfo{
ID: dht.self,
Addrs: dht.host.Addrs(),
@ -523,14 +521,13 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i
return peerOut
}
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Cid, count int, peerOut chan peer.AddrInfo) {
defer logger.EventBegin(ctx, "findProvidersAsync", keyCid).Done()
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, count int, peerOut chan peer.AddrInfo) {
defer logger.EventBegin(ctx, "findProvidersAsync", key).Done()
defer close(peerOut)
key := keyCid.Hash()
keyMH := key.Hash()
ps := peer.NewLimitedSet(count)
provs := dht.providers.GetProviders(ctx, key)
provs := dht.providers.GetProviders(ctx, keyMH)
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
@ -549,7 +546,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci
}
}
peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(key)), AlphaValue)
peers := dht.routingTable.NearestPeers(kb.ConvertKey(string(keyMH)), AlphaValue)
if len(peers) == 0 {
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
@ -560,12 +557,12 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci
// setup the Query
parent := ctx
query := dht.newQuery(string(key), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
query := dht.newQuery(string(keyMH), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
routing.PublishQueryEvent(parent, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})
pmes, err := dht.findProvidersSingle(ctx, p, keyCid)
pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil {
return nil, err
}
@ -628,7 +625,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, keyCid cid.Ci
}
// refresh the cpl for this key after the query is run
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(key)), time.Now())
dht.routingTable.ResetCplRefreshedAtForID(kb.ConvertKey(string(keyMH)), time.Now())
}
// FindPeer searches for a peer with given ID.