go-libp2p-kad-dht/routing.go
Adin Schmahmann fcbc5f9e7f
Asynchronous lookups (#498)
* feat(query): fully async implementation of Kademlia lookup. peers returned from the lookup are not guaranteed to be alive (i.e. we're only guaranteed to have dialed the closest beta peers to the target), but given stable and correct routing tables the expectation that most of the peers returned are alive is high.
* feat(query): add wrapper lookup followup function to followup after the lookup is completed and ensure that the closest k returned peers from a lookup have been queried even for beta < k
* refactor(query) modified the structure returned from lookups to be a useful subset of the full query state instead of the entire query state
* feat(options): beta parameter exposed as the Resiliency parameter
* feat(routing): do not mark the routing table as updated after a FindPeer query
* feat(routing): FindPeer can return addresses even if not Connected as long as it was either recently connected (CanConnect) or was discovered during the lookup
* feat(bootstrap): bootstrap logic now uses GetClosestPeers instead of FindPeer
* refactor(dht): stopFn no longer takes any state
* fix(test): changed GetClosestPeers test to only assume beta instead of k peers since that is now more appropriate given the query logic changes and that the routing tables in that test are bad, i.e. a ring network with arbitrary peerIDs

Co-authored-by: Petar Maymounkov <petarm@gmail.com>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
2020-03-24 11:17:48 -04:00

712 lines
18 KiB
Go

package dht
import (
"bytes"
"context"
"fmt"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/ipfs/go-cid"
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/qpeerset"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
"github.com/multiformats/go-multihash"
)
// asyncQueryBuffer is the size of buffered channels in async queries. This
// buffer allows multiple queries to execute simultaneously, return their
// results and continue querying closer peers. Note that different query
// results will wait for the channel to drain.
var asyncQueryBuffer = 10
// This file implements the Routing interface for the IpfsDHT struct.
// Basic Put/Get
// PutValue adds value corresponding to given Key.
// This is the top level "Store" operation of the DHT
func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts ...routing.Option) (err error) {
if !dht.enableValues {
return routing.ErrNotSupported
}
eip := logger.EventBegin(ctx, "PutValue")
defer func() {
eip.Append(loggableKey(key))
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
logger.Debugf("PutValue %s", key)
// don't even allow local users to put bad values.
if err := dht.Validator.Validate(key, value); err != nil {
return err
}
old, err := dht.getLocal(key)
if err != nil {
// Means something is wrong with the datastore.
return err
}
// Check if we have an old value that's not the same as the new one.
if old != nil && !bytes.Equal(old.GetValue(), value) {
// Check to see if the new one is better.
i, err := dht.Validator.Select(key, [][]byte{value, old.GetValue()})
if err != nil {
return err
}
if i != 0 {
return fmt.Errorf("can't replace a newer value with an older value")
}
}
rec := record.MakePutRecord(key, value)
rec.TimeReceived = u.FormatRFC3339(time.Now())
err = dht.putLocal(key, rec)
if err != nil {
return err
}
pchan, err := dht.GetClosestPeers(ctx, key)
if err != nil {
return err
}
wg := sync.WaitGroup{}
for p := range pchan {
wg.Add(1)
go func(p peer.ID) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer wg.Done()
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.Value,
ID: p,
})
err := dht.putValueToPeer(ctx, p, rec)
if err != nil {
logger.Debugf("failed putting value to peer: %s", err)
}
}(p)
}
wg.Wait()
return nil
}
// RecvdVal stores a value and the peer from which we got the value.
type RecvdVal struct {
Val []byte
From peer.ID
}
// GetValue searches for the value corresponding to given Key.
func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...routing.Option) (_ []byte, err error) {
if !dht.enableValues {
return nil, routing.ErrNotSupported
}
eip := logger.EventBegin(ctx, "GetValue")
defer func() {
eip.Append(loggableKey(key))
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
// apply defaultQuorum if relevant
var cfg routing.Options
if err := cfg.Apply(opts...); err != nil {
return nil, err
}
opts = append(opts, Quorum(getQuorum(&cfg, defaultQuorum)))
responses, err := dht.SearchValue(ctx, key, opts...)
if err != nil {
return nil, err
}
var best []byte
for r := range responses {
best = r
}
if ctx.Err() != nil {
return best, ctx.Err()
}
if best == nil {
return nil, routing.ErrNotFound
}
logger.Debugf("GetValue %v %v", key, best)
return best, nil
}
func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing.Option) (<-chan []byte, error) {
if !dht.enableValues {
return nil, routing.ErrNotSupported
}
var cfg routing.Options
if err := cfg.Apply(opts...); err != nil {
return nil, err
}
responsesNeeded := 0
if !cfg.Offline {
responsesNeeded = getQuorum(&cfg, defaultQuorum)
}
stopCh := make(chan struct{})
valCh, lookupRes := dht.getValues(ctx, key, stopCh)
out := make(chan []byte)
go func() {
defer close(out)
best, peersWithBest, aborted := dht.searchValueQuorum(ctx, key, valCh, stopCh, out, responsesNeeded)
if best == nil || aborted {
return
}
updatePeers := make([]peer.ID, 0, dht.bucketSize)
select {
case l := <-lookupRes:
if l == nil {
return
}
for _, p := range l.peers {
if _, ok := peersWithBest[p]; !ok {
updatePeers = append(updatePeers, p)
}
}
case <-ctx.Done():
return
}
dht.updatePeerValues(dht.Context(), key, best, updatePeers)
}()
return out, nil
}
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
numResponses := 0
return dht.processValues(ctx, key, valCh,
func(ctx context.Context, v RecvdVal, better bool) bool {
numResponses++
if better {
select {
case out <- v.Val:
case <-ctx.Done():
return false
}
}
if nvals > 0 && numResponses > nvals {
close(stopCh)
return true
}
return false
})
}
// GetValues gets nvals values corresponding to the given key.
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
if !dht.enableValues {
return nil, routing.ErrNotSupported
}
eip := logger.EventBegin(ctx, "GetValues")
eip.Append(loggableKey(key))
defer eip.Done()
queryCtx, cancel := context.WithCancel(ctx)
valCh, _ := dht.getValues(queryCtx, key, nil)
out := make([]RecvdVal, 0, nvals)
for val := range valCh {
out = append(out, val)
if len(out) == nvals {
cancel()
}
}
return out, ctx.Err()
}
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
loop:
for {
if aborted {
return
}
select {
case v, ok := <-vals:
if !ok {
break loop
}
// Select best value
if best != nil {
if bytes.Equal(best, v.Val) {
peersWithBest[v.From] = struct{}{}
aborted = newVal(ctx, v, false)
continue
}
sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
if err != nil {
logger.Warning("Failed to select dht key: ", err)
continue
}
if sel != 1 {
aborted = newVal(ctx, v, false)
continue
}
}
peersWithBest = make(map[peer.ID]struct{})
peersWithBest[v.From] = struct{}{}
best = v.Val
aborted = newVal(ctx, v, true)
case <-ctx.Done():
return
}
}
return
}
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
fixupRec := record.MakePutRecord(key, val)
for _, p := range peers {
go func(p peer.ID) {
//TODO: Is this possible?
if p == dht.self {
err := dht.putLocal(key, fixupRec)
if err != nil {
logger.Error("Error correcting local dht entry:", err)
}
return
}
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
err := dht.putValueToPeer(ctx, p, fixupRec)
if err != nil {
logger.Debug("Error correcting DHT entry: ", err)
}
}(p)
}
}
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan *lookupWithFollowupResult) {
valCh := make(chan RecvdVal, 1)
lookupResCh := make(chan *lookupWithFollowupResult, 1)
if rec, err := dht.getLocal(key); rec != nil && err == nil {
select {
case valCh <- RecvdVal{
Val: rec.GetValue(),
From: dht.self,
}:
case <-ctx.Done():
}
}
go func() {
defer close(valCh)
defer close(lookupResCh)
lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, key,
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})
rec, peers, err := dht.getValueOrPeers(ctx, p, key)
switch err {
case routing.ErrNotFound:
// in this case, they responded with nothing,
// still send a notification so listeners can know the
// request has completed 'successfully'
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
})
return nil, err
default:
return nil, err
case nil, errInvalidRecord:
// in either of these cases, we want to keep going
}
// TODO: What should happen if the record is invalid?
// Pre-existing code counted it towards the quorum, but should it?
if rec != nil && rec.GetValue() != nil {
rv := RecvdVal{
Val: rec.GetValue(),
From: p,
}
select {
case valCh <- rv:
case <-ctx.Done():
return nil, ctx.Err()
}
}
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})
return peers, err
},
func() bool {
select {
case <-stopQuery:
return true
default:
return false
}
},
)
if err != nil {
return
}
lookupResCh <- lookupRes
if ctx.Err() == nil {
dht.refreshRTIfNoShortcut(kb.ConvertKey(key), lookupRes)
}
}()
return valCh, lookupResCh
}
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, lookupRes *lookupWithFollowupResult) {
if lookupRes.completed {
// refresh the cpl for this key as the query was successful
dht.routingTable.ResetCplRefreshedAtForID(key, time.Now())
}
}
// Provider abstraction for indirect stores.
// Some DHTs store values directly, while an indirect store stores pointers to
// locations of the value, similarly to Coral and Mainline DHT.
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err error) {
if !dht.enableProviders {
return routing.ErrNotSupported
}
keyMH := key.Hash()
eip := logger.EventBegin(ctx, "Provide", multihashLoggableKey(keyMH), logging.LoggableMap{"broadcast": brdcst})
defer func() {
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
// add self locally
dht.providers.AddProvider(ctx, keyMH, dht.self)
if !brdcst {
return nil
}
closerCtx := ctx
if deadline, ok := ctx.Deadline(); ok {
now := time.Now()
timeout := deadline.Sub(now)
if timeout < 0 {
// timed out
return context.DeadlineExceeded
} else if timeout < 10*time.Second {
// Reserve 10% for the final put.
deadline = deadline.Add(-timeout / 10)
} else {
// Otherwise, reserve a second (we'll already be
// connected so this should be fast).
deadline = deadline.Add(-time.Second)
}
var cancel context.CancelFunc
closerCtx, cancel = context.WithDeadline(ctx, deadline)
defer cancel()
}
var exceededDeadline bool
peers, err := dht.GetClosestPeers(closerCtx, string(keyMH))
switch err {
case context.DeadlineExceeded:
// If the _inner_ deadline has been exceeded but the _outer_
// context is still fine, provide the value to the closest peers
// we managed to find, even if they're not the _actual_ closest peers.
if ctx.Err() != nil {
return ctx.Err()
}
exceededDeadline = true
case nil:
default:
return err
}
mes, err := dht.makeProvRecord(keyMH)
if err != nil {
return err
}
wg := sync.WaitGroup{}
for p := range peers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
logger.Debugf("putProvider(%s, %s)", keyMH, p)
err := dht.sendMessage(ctx, p, mes)
if err != nil {
logger.Debug(err)
}
}(p)
}
wg.Wait()
if exceededDeadline {
return context.DeadlineExceeded
}
return ctx.Err()
}
func (dht *IpfsDHT) makeProvRecord(key []byte) (*pb.Message, error) {
pi := peer.AddrInfo{
ID: dht.self,
Addrs: dht.host.Addrs(),
}
// // only share WAN-friendly addresses ??
// pi.Addrs = addrutil.WANShareableAddrs(pi.Addrs)
if len(pi.Addrs) < 1 {
return nil, fmt.Errorf("no known addresses for self. cannot put provider.")
}
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, key, 0)
pmes.ProviderPeers = pb.RawPeerInfosToPBPeers([]peer.AddrInfo{pi})
return pmes, nil
}
// FindProviders searches until the context expires.
func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrInfo, error) {
if !dht.enableProviders {
return nil, routing.ErrNotSupported
}
var providers []peer.AddrInfo
for p := range dht.FindProvidersAsync(ctx, c, dht.bucketSize) {
providers = append(providers, p)
}
return providers, nil
}
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
// Peers will be returned on the channel as soon as they are found, even before
// the search query completes. If count is zero then the query will run until it
// completes. Note: not reading from the returned channel may block the query
// from progressing.
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo {
if !dht.enableProviders {
peerOut := make(chan peer.AddrInfo)
close(peerOut)
return peerOut
}
chSize := count
if count == 0 {
chSize = 1
}
peerOut := make(chan peer.AddrInfo, chSize)
keyMH := key.Hash()
logger.Event(ctx, "findProviders", multihashLoggableKey(keyMH))
go dht.findProvidersAsyncRoutine(ctx, keyMH, count, peerOut)
return peerOut
}
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash.Multihash, count int, peerOut chan peer.AddrInfo) {
defer logger.EventBegin(ctx, "findProvidersAsync", multihashLoggableKey(key)).Done()
defer close(peerOut)
findAll := count == 0
var ps *peer.Set
if findAll {
ps = peer.NewSet()
} else {
ps = peer.NewLimitedSet(count)
}
provs := dht.providers.GetProviders(ctx, key)
for _, p := range provs {
// NOTE: Assuming that this list of peers is unique
if ps.TryAdd(p) {
pi := dht.peerstore.PeerInfo(p)
select {
case peerOut <- pi:
case <-ctx.Done():
return
}
}
// If we have enough peers locally, don't bother with remote RPC
// TODO: is this a DOS vector?
if !findAll && ps.Size() >= count {
return
}
}
lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, string(key),
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})
pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil {
return nil, err
}
logger.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
logger.Debugf("%d provider entries decoded", len(provs))
// Add unique providers from request, up to 'count'
for _, prov := range provs {
if prov.ID != dht.self {
dht.peerstore.AddAddrs(prov.ID, prov.Addrs, peerstore.TempAddrTTL)
}
logger.Debugf("got provider: %s", prov)
if ps.TryAdd(prov.ID) {
logger.Debugf("using provider: %s", prov)
select {
case peerOut <- *prov:
case <-ctx.Done():
logger.Debug("context timed out sending more providers")
return nil, ctx.Err()
}
}
if !findAll && ps.Size() >= count {
logger.Debugf("got enough providers (%d/%d)", ps.Size(), count)
return nil, nil
}
}
// Give closer peers back to the query to be queried
closer := pmes.GetCloserPeers()
peers := pb.PBPeersToPeerInfos(closer)
logger.Debugf("got closer peers: %d %s", len(peers), peers)
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})
return peers, nil
},
func() bool {
return !findAll && ps.Size() >= count
},
)
if err != nil && ctx.Err() == nil {
dht.refreshRTIfNoShortcut(kb.ConvertKey(string(key)), lookupRes)
}
}
// FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, err error) {
eip := logger.EventBegin(ctx, "FindPeer", id)
defer func() {
if err != nil {
eip.SetError(err)
}
eip.Done()
}()
// Check if were already connected to them
if pi := dht.FindLocal(id); pi.ID != "" {
return pi, nil
}
lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, string(id),
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.SendingQuery,
ID: p,
})
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
logger.Debugf("error getting closer peers: %s", err)
return nil, err
}
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.PeerResponse,
ID: p,
Responses: peers,
})
return peers, err
},
func() bool {
return dht.host.Network().Connectedness(id) == network.Connected
},
)
if err != nil {
return peer.AddrInfo{}, err
}
dialedPeerDuringQuery := false
for i, p := range lookupRes.peers {
if p == id {
// Note: we consider PeerUnreachable to be a valid state because the peer may not support the DHT protocol
// and therefore the peer would fail the query. The fact that a peer that is returned can be a non-DHT
// server peer and is not identified as such is a bug.
dialedPeerDuringQuery = lookupRes.state[i] != qpeerset.PeerHeard
break
}
}
// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
// to the peer.
connectedness := dht.host.Network().Connectedness(id)
if dialedPeerDuringQuery || connectedness == network.Connected || connectedness == network.CanConnect {
return dht.peerstore.PeerInfo(id), nil
}
return peer.AddrInfo{}, routing.ErrNotFound
}