Adin Schmahmann fcbc5f9e7f
Asynchronous lookups (#498)
* feat(query): fully async implementation of Kademlia lookup. peers returned from the lookup are not guaranteed to be alive (i.e. we're only guaranteed to have dialed the closest beta peers to the target), but given stable and correct routing tables the expectation that most of the peers returned are alive is high.
* feat(query): add wrapper lookup followup function to followup after the lookup is completed and ensure that the closest k returned peers from a lookup have been queried even for beta < k
* refactor(query) modified the structure returned from lookups to be a useful subset of the full query state instead of the entire query state
* feat(options): beta parameter exposed as the Resiliency parameter
* feat(routing): do not mark the routing table as updated after a FindPeer query
* feat(routing): FindPeer can return addresses even if not Connected as long as it was either recently connected (CanConnect) or was discovered during the lookup
* feat(bootstrap): bootstrap logic now uses GetClosestPeers instead of FindPeer
* refactor(dht): stopFn no longer takes any state
* fix(test): changed GetClosestPeers test to only assume beta instead of k peers since that is now more appropriate given the query logic changes and that the routing tables in that test are bad, i.e. a ring network with arbitrary peerIDs

Co-authored-by: Petar Maymounkov <petarm@gmail.com>
Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
2020-03-24 11:17:48 -04:00

645 lines
18 KiB
Go

package dht
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
kb "github.com/libp2p/go-libp2p-kbucket"
record "github.com/libp2p/go-libp2p-record"
recpb "github.com/libp2p/go-libp2p-record/pb"
"github.com/multiformats/go-base32"
"github.com/multiformats/go-multihash"
)
var logger = logging.Logger("dht")
var rtPvLogger = logging.Logger("dht/rt-validation")
const BaseConnMgrScore = 5
type mode int
const (
modeServer mode = 1
modeClient = 2
)
const (
kad1 protocol.ID = "/kad/1.0.0"
kad2 protocol.ID = "/kad/2.0.0"
)
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
// It is used to implement the base Routing module.
type IpfsDHT struct {
host host.Host // the network services we need
self peer.ID // Local peer (yourself)
peerstore peerstore.Peerstore // Peer Registry
datastore ds.Datastore // Local data
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
providers *providers.ProviderManager
birth time.Time // When this peer started up
rng *rand.Rand // Source of randomness
rnglk sync.Mutex // Rand does not support concurrency
Validator record.Validator
ctx context.Context
proc goprocess.Process
strmap map[peer.ID]*messageSender
smlk sync.Mutex
plk sync.Mutex
stripedPutLocks [256]sync.Mutex
// Primary DHT protocols - we query and respond to these protocols
protocols []protocol.ID
// DHT protocols we can respond to (may contain protocols in addition to the primary protocols)
serverProtocols []protocol.ID
auto bool
mode mode
modeLk sync.Mutex
bucketSize int
alpha int // The concurrency parameter per path
beta int // The number of peers closest to a target that must have responded for a query path to terminate
d int // Number of Disjoint Paths to query
autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error
maxRecordAge time.Duration
// Allows disabling dht subsystems. These should _only_ be set on
// "forked" DHTs (e.g., DHTs with custom protocols and/or private
// networks).
enableProviders, enableValues bool
}
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
// guarantee, but we can use them to aid refactoring.
var (
_ routing.ContentRouting = (*IpfsDHT)(nil)
_ routing.Routing = (*IpfsDHT)(nil)
_ routing.PeerRouting = (*IpfsDHT)(nil)
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
_ routing.ValueStore = (*IpfsDHT)(nil)
)
// New creates a new DHT with the specified host and options.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
var cfg config
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
return nil, err
}
if err := cfg.applyFallbacks(); err != nil {
return nil, err
}
if err := cfg.validate(); err != nil {
return nil, err
}
dht, err := makeDHT(ctx, h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}
dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
dht.maxRecordAge = cfg.maxRecordAge
dht.enableProviders = cfg.enableProviders
dht.enableValues = cfg.enableValues
dht.Validator = cfg.validator
switch cfg.mode {
case ModeAuto:
dht.auto = true
dht.mode = modeClient
case ModeClient:
dht.auto = false
dht.mode = modeClient
case ModeServer:
dht.auto = false
dht.mode = modeServer
default:
return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
}
if dht.mode == modeServer {
if err := dht.moveToServerMode(); err != nil {
return nil, err
}
}
// register for event bus and network notifications
sn, err := newSubscriberNotifiee(dht)
if err != nil {
return nil, err
}
dht.proc.Go(sn.subscribe)
// handle providers
dht.proc.AddChild(dht.providers.Process())
dht.startSelfLookup()
dht.startRefreshing()
return dht, nil
}
// NewDHT creates a new DHT object with the given peer as the 'local' host.
// IpfsDHT's initialized with this function will respond to DHT requests,
// whereas IpfsDHT's initialized with NewDHTClient will not.
func NewDHT(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, Datastore(dstore))
if err != nil {
panic(err)
}
return dht
}
// NewDHTClient creates a new DHT object with the given peer as the 'local'
// host. IpfsDHT clients initialized with this function will not respond to DHT
// requests. If you need a peer to respond to DHT requests, use NewDHT instead.
// NewDHTClient creates a new DHT object with the given peer as the 'local' host
func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT {
dht, err := New(ctx, h, Datastore(dstore), Mode(ModeClient))
if err != nil {
panic(err)
}
return dht
}
func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
protocols := []protocol.ID{cfg.protocolPrefix + kad2}
serverProtocols := []protocol.ID{cfg.protocolPrefix + kad2, cfg.protocolPrefix + kad1}
// check if custom test protocols were set
if len(cfg.testProtocols) > 0 {
protocols = make([]protocol.ID, len(cfg.testProtocols))
serverProtocols = make([]protocol.ID, len(cfg.testProtocols))
for i, p := range cfg.testProtocols {
protocols[i] = cfg.protocolPrefix + p
serverProtocols[i] = cfg.protocolPrefix + p
}
}
dht := &IpfsDHT{
datastore: cfg.datastore,
self: h.ID(),
peerstore: h.Peerstore(),
host: h,
strmap: make(map[peer.ID]*messageSender),
birth: time.Now(),
rng: rand.New(rand.NewSource(rand.Int63())),
protocols: protocols,
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
d: cfg.disjointPaths,
triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
}
// construct routing table
rt, err := makeRoutingTable(dht, cfg)
if err != nil {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt
// create a DHT proc with the given context
dht.proc = goprocessctx.WithContext(ctx)
// create a tagged context derived from the original context
ctxTags := dht.newContextWithLocalTags(ctx)
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)
dht.providers = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)
return dht, nil
}
func makeRoutingTable(dht *IpfsDHT, cfg config) (*kb.RoutingTable, error) {
self := kb.ConvertPeerID(dht.host.ID())
// construct the routing table with a peer validation function
pvF := func(c context.Context, p peer.ID) bool {
// connect should work
if err := dht.host.Connect(c, peer.AddrInfo{ID: p}); err != nil {
rtPvLogger.Infof("failed to connect to peer %s for validation, err=%s", p, err)
return false
}
// peer should support the DHT protocol
b, err := dht.validRTPeer(p)
if err != nil {
rtPvLogger.Errorf("failed to check if peer %s supports DHT protocol, err=%s", p, err)
return false
}
return b
}
rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
if !(cfg.routingTable.checkInterval == 0) {
rtOpts = append(rtOpts, kb.TableCleanupInterval(cfg.routingTable.checkInterval))
}
rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, dht.host.Peerstore(),
rtOpts...)
cmgr := dht.host.ConnManager()
rt.PeerAdded = func(p peer.ID) {
commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
}
return rt, err
}
// putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
pmes := pb.NewMessage(pb.Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
rpmes, err := dht.sendRequest(ctx, p, pmes)
if err != nil {
logger.Debugf("putValueToPeer: %v. (peer: %s, key: %s)", err, p.Pretty(), loggableKey(string(rec.Key)))
return err
}
if !bytes.Equal(rpmes.GetRecord().Value, pmes.GetRecord().Value) {
logger.Warningf("putValueToPeer: value not put correctly. (%v != %v)", pmes, rpmes)
return errors.New("value not put correctly")
}
return nil
}
var errInvalidRecord = errors.New("received invalid record")
// getValueOrPeers queries a particular peer p for the value for
// key. It returns either the value or a list of closer peers.
// NOTE: It will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
}
// Perhaps we were given closer peers
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
if record := pmes.GetRecord(); record != nil {
// Success! We were given the value
logger.Debug("getValueOrPeers: got value")
// make sure record is valid.
err = dht.Validator.Validate(string(record.GetKey()), record.GetValue())
if err != nil {
logger.Info("Received invalid record! (discarded)")
// return a sentinal to signify an invalid record was received
err = errInvalidRecord
record = new(recpb.Record)
}
return record, peers, err
}
if len(peers) > 0 {
logger.Debug("getValueOrPeers: peers")
return nil, peers, nil
}
logger.Warning("getValueOrPeers: routing.ErrNotFound")
return nil, nil, routing.ErrNotFound
}
// getValueSingle simply performs the get value RPC with the given parameters
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (*pb.Message, error) {
meta := logging.LoggableMap{
"key": key,
"peer": p,
}
eip := logger.EventBegin(ctx, "getValueSingle", meta)
defer eip.Done()
pmes := pb.NewMessage(pb.Message_GET_VALUE, []byte(key), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
logger.Warningf("getValueSingle: read timeout %s %s", p.Pretty(), key)
fallthrough
default:
eip.SetError(err)
return nil, err
}
}
// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) {
logger.Debugf("getLocal %s", key)
rec, err := dht.getRecordFromDatastore(mkDsKey(key))
if err != nil {
logger.Warningf("getLocal: %s", err)
return nil, err
}
// Double check the key. Can't hurt.
if rec != nil && string(rec.GetKey()) != key {
logger.Errorf("BUG getLocal: found a DHT record that didn't match it's key: %s != %s", rec.GetKey(), key)
return nil, nil
}
return rec, nil
}
// putLocal stores the key value pair in the datastore
func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
logger.Debugf("putLocal: %v %v", key, rec)
data, err := proto.Marshal(rec)
if err != nil {
logger.Warningf("putLocal: %s", err)
return err
}
return dht.datastore.Put(mkDsKey(key), data)
}
// peerFound signals the routingTable that we've found a peer that
// supports the DHT protocol.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerFound", p)
dht.routingTable.HandlePeerAlive(p)
}
// peerStoppedDHT signals the routing table that a peer has stopped supporting the DHT protocol.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerStoppedDHT", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.HandlePeerDead(p)
}
// peerDisconnected signals the routing table that a peer is not connected anymore.
func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerDisconnected", p)
dht.routingTable.HandlePeerDisconnect(p)
}
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) peer.AddrInfo {
switch dht.host.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
return dht.peerstore.PeerInfo(id)
default:
return peer.AddrInfo{}
}
}
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findPeerSingle",
logging.LoggableMap{
"peer": p,
"target": id,
})
defer eip.Done()
pmes := pb.NewMessage(pb.Message_FIND_NODE, []byte(id), 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
logger.Warningf("read timeout: %s %s", p.Pretty(), id)
fallthrough
default:
eip.SetError(err)
return nil, err
}
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key multihash.Multihash) (*pb.Message, error) {
eip := logger.EventBegin(ctx, "findProvidersSingle", p, multihashLoggableKey(key))
defer eip.Done()
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key, 0)
resp, err := dht.sendRequest(ctx, p, pmes)
switch err {
case nil:
return resp, nil
case ErrReadTimeout:
logger.Warningf("read timeout: %s %s", p.Pretty(), key)
fallthrough
default:
eip.SetError(err)
return nil, err
}
}
// nearestPeersToQuery returns the routing tables closest peers.
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
closer := dht.routingTable.NearestPeers(kb.ConvertKey(string(pmes.GetKey())), count)
return closer
}
// betterPeersToQuery returns nearestPeersToQuery with some additional filtering
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) []peer.ID {
closer := dht.nearestPeersToQuery(pmes, count)
// no node? nil
if closer == nil {
logger.Warning("betterPeersToQuery: no closer peers to send:", p)
return nil
}
filtered := make([]peer.ID, 0, len(closer))
for _, clp := range closer {
// == to self? thats bad
if clp == dht.self {
logger.Error("BUG betterPeersToQuery: attempted to return self! this shouldn't happen...")
return nil
}
// Dont send a peer back themselves
if clp == p {
continue
}
filtered = append(filtered, clp)
}
// ok seems like closer nodes
return filtered
}
func (dht *IpfsDHT) setMode(m mode) error {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()
if m == dht.mode {
return nil
}
switch m {
case modeServer:
return dht.moveToServerMode()
case modeClient:
return dht.moveToClientMode()
default:
return fmt.Errorf("unrecognized dht mode: %d", m)
}
}
// moveToServerMode advertises (via libp2p identify updates) that we are able to respond to DHT queries and sets the appropriate stream handlers.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToServerMode() error {
dht.mode = modeServer
for _, p := range dht.serverProtocols {
dht.host.SetStreamHandler(p, dht.handleNewStream)
}
return nil
}
// moveToClientMode stops advertising (and rescinds advertisements via libp2p identify updates) that we are able to
// respond to DHT queries and removes the appropriate stream handlers. We also kill all inbound streams that were
// utilizing the handled protocols.
// Note: We may support responding to queries with protocols aside from our primary ones in order to support
// interoperability with older versions of the DHT protocol.
func (dht *IpfsDHT) moveToClientMode() error {
dht.mode = modeClient
for _, p := range dht.serverProtocols {
dht.host.RemoveStreamHandler(p)
}
pset := make(map[protocol.ID]bool)
for _, p := range dht.serverProtocols {
pset[p] = true
}
for _, c := range dht.host.Network().Conns() {
for _, s := range c.GetStreams() {
if pset[s.Protocol()] {
if s.Stat().Direction == network.DirInbound {
s.Reset()
}
}
}
}
return nil
}
func (dht *IpfsDHT) getMode() mode {
dht.modeLk.Lock()
defer dht.modeLk.Unlock()
return dht.mode
}
// Context return dht's context
func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
}
// Process return dht's process
func (dht *IpfsDHT) Process() goprocess.Process {
return dht.proc
}
// RoutingTable return dht's routingTable
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
return dht.routingTable
}
// Close calls Process Close
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
}
func mkDsKey(s string) ds.Key {
return ds.NewKey(base32.RawStdEncoding.EncodeToString([]byte(s)))
}
func (dht *IpfsDHT) PeerID() peer.ID {
return dht.self
}
func (dht *IpfsDHT) PeerKey() []byte {
return kb.ConvertPeerID(dht.self)
}
func (dht *IpfsDHT) Host() host.Host {
return dht.host
}
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
req := pb.NewMessage(pb.Message_PING, nil, 0)
resp, err := dht.sendRequest(ctx, p, req)
if err != nil {
return xerrors.Errorf("sending request: %w", err)
}
if resp.Type != pb.Message_PING {
return xerrors.Errorf("got unexpected response type: %v", resp.Type)
}
return nil
}
// newContextWithLocalTags returns a new context.Context with the InstanceID and
// PeerID keys populated. It will also take any extra tags that need adding to
// the context as tag.Mutators.
func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...tag.Mutator) context.Context {
extraTags = append(
extraTags,
tag.Upsert(metrics.KeyPeerID, dht.self.Pretty()),
tag.Upsert(metrics.KeyInstanceID, fmt.Sprintf("%p", dht)),
)
ctx, _ = tag.New(
ctx,
extraTags...,
) // ignoring error as it is unrelated to the actual function of this code.
return ctx
}