mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
Remove disjoint queries (#503)
* remove disjoint queries * remove globally queried check * removed dht rng field and mutex * removed the Disjoint option Co-authored-by: Adin Schmahmann <adin.schmahmann@gmail.com>
This commit is contained in:
parent
99716d16b4
commit
314213d5d8
12
dht.go
12
dht.go
@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -64,9 +63,7 @@ type IpfsDHT struct {
|
||||
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
|
||||
providers *providers.ProviderManager
|
||||
|
||||
birth time.Time // When this peer started up
|
||||
rng *rand.Rand // Source of randomness
|
||||
rnglk sync.Mutex // Rand does not support concurrency
|
||||
birth time.Time // When this peer started up
|
||||
|
||||
Validator record.Validator
|
||||
|
||||
@ -93,7 +90,6 @@ type IpfsDHT struct {
|
||||
bucketSize int
|
||||
alpha int // The concurrency parameter per path
|
||||
beta int // The number of peers closest to a target that must have responded for a query path to terminate
|
||||
d int // Number of Disjoint Paths to query
|
||||
|
||||
autoRefresh bool
|
||||
rtRefreshQueryTimeout time.Duration
|
||||
@ -125,9 +121,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
|
||||
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := cfg.applyFallbacks(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := cfg.validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -225,13 +219,11 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
|
||||
host: h,
|
||||
strmap: make(map[peer.ID]*messageSender),
|
||||
birth: time.Now(),
|
||||
rng: rand.New(rand.NewSource(rand.Int63())),
|
||||
protocols: protocols,
|
||||
serverProtocols: serverProtocols,
|
||||
bucketSize: cfg.bucketSize,
|
||||
alpha: cfg.concurrency,
|
||||
beta: cfg.resiliency,
|
||||
d: cfg.disjointPaths,
|
||||
triggerRtRefresh: make(chan chan<- error),
|
||||
triggerSelfLookup: make(chan chan<- error),
|
||||
}
|
||||
|
@ -32,7 +32,6 @@ type config struct {
|
||||
mode ModeOpt
|
||||
protocolPrefix protocol.ID
|
||||
bucketSize int
|
||||
disjointPaths int
|
||||
concurrency int
|
||||
resiliency int
|
||||
maxRecordAge time.Duration
|
||||
@ -93,15 +92,6 @@ var defaults = func(o *config) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyFallbacks sets default DHT options. It is applied after Defaults and any options passed to the constructor in
|
||||
// order to allow for defaults that are based on other set options.
|
||||
func (c *config) applyFallbacks() error {
|
||||
if c.disjointPaths == 0 {
|
||||
c.disjointPaths = c.bucketSize / 2
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *config) validate() error {
|
||||
if c.protocolPrefix == DefaultPrefix {
|
||||
if c.bucketSize != defaultBucketSize {
|
||||
@ -252,16 +242,6 @@ func Resiliency(beta int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// DisjointPaths configures the number of disjoint paths (d in the S/Kademlia paper) taken per query.
|
||||
//
|
||||
// The default value is BucketSize/2.
|
||||
func DisjointPaths(d int) Option {
|
||||
return func(c *config) error {
|
||||
c.disjointPaths = d
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
|
||||
// from the time its received. This does not apply to any other forms of validity that
|
||||
// the record may contain.
|
||||
|
@ -1755,7 +1755,6 @@ func TestProtocolUpgrade(t *testing.T) {
|
||||
Mode(ModeServer),
|
||||
NamespacedValidator("v", blankValidator{}),
|
||||
DisableAutoRefresh(),
|
||||
DisjointPaths(1),
|
||||
}
|
||||
|
||||
// This test verifies that we can have a node serving both old and new DHTs that will respond as a server to the old
|
||||
|
@ -75,7 +75,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
|
||||
e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
|
||||
defer e.Done()
|
||||
|
||||
lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, key,
|
||||
lookupRes, err := dht.runLookupWithFollowup(ctx, key,
|
||||
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
|
||||
// For DHT query command
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
|
128
query.go
128
query.go
@ -20,7 +20,7 @@ var ErrNoPeersQueried = errors.New("failed to query any peers")
|
||||
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
|
||||
type stopFn func() bool
|
||||
|
||||
// query represents a single disjoint query.
|
||||
// query represents a single DHT query.
|
||||
type query struct {
|
||||
// the query context.
|
||||
ctx context.Context
|
||||
@ -40,9 +40,6 @@ type query struct {
|
||||
// Its role is to make sure that once termination is determined, it is sticky.
|
||||
terminated bool
|
||||
|
||||
// globallyQueriedPeers is the combined set of peers queried across ALL the disjoint queries.
|
||||
globallyQueriedPeers *peer.Set // TODO: abstract this away from specifics of disjoint paths
|
||||
|
||||
// the function that will be used to query a single peer.
|
||||
queryFn queryFn
|
||||
|
||||
@ -51,8 +48,8 @@ type query struct {
|
||||
}
|
||||
|
||||
type lookupWithFollowupResult struct {
|
||||
peers []peer.ID // the top K not unreachable peers across all query paths
|
||||
state []qpeerset.PeerState // the peer states at the end of the queries
|
||||
peers []peer.ID // the top K not unreachable peers at the end of the query
|
||||
state []qpeerset.PeerState // the peer states at the end of the query
|
||||
|
||||
// indicates that neither the lookup nor the followup has been prematurely terminated by an external condition such
|
||||
// as context cancellation or the stop function being called.
|
||||
@ -66,9 +63,9 @@ type lookupWithFollowupResult struct {
|
||||
//
|
||||
// After the lookup is complete the query function is run (unless stopped) against all of the top K peers from the
|
||||
// lookup that have not already been successfully queried.
|
||||
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
|
||||
func (dht *IpfsDHT) runLookupWithFollowup(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
|
||||
// run the query
|
||||
lookupRes, err := dht.runDisjointQueries(ctx, d, target, queryFn, stopFn)
|
||||
lookupRes, err := dht.runQuery(ctx, target, queryFn, stopFn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -125,8 +122,7 @@ processFollowUp:
|
||||
return lookupRes, nil
|
||||
}
|
||||
|
||||
// d is the number of disjoint queries.
|
||||
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
|
||||
func (dht *IpfsDHT) runQuery(ctx context.Context, target string, queryFn queryFn, stopFn stopFn) (*lookupWithFollowupResult, error) {
|
||||
queryCtx, cancelQuery := context.WithCancel(ctx)
|
||||
|
||||
// pick the K closest peers to the key in our Routing table and shuffle them.
|
||||
@ -140,103 +136,50 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string
|
||||
return nil, kb.ErrLookupFailure
|
||||
}
|
||||
|
||||
dht.rnglk.Lock()
|
||||
dht.rng.Shuffle(len(seedPeers), func(i, j int) {
|
||||
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
|
||||
})
|
||||
dht.rnglk.Unlock()
|
||||
|
||||
// create "d" disjoint queries
|
||||
queries := make([]*query, d)
|
||||
peersQueried := peer.NewSet()
|
||||
for i := 0; i < d; i++ {
|
||||
query := &query{
|
||||
ctx: queryCtx,
|
||||
cancel: cancelQuery,
|
||||
dht: dht,
|
||||
queryPeers: qpeerset.NewQueryPeerset(target),
|
||||
seedPeers: []peer.ID{},
|
||||
terminated: false,
|
||||
globallyQueriedPeers: peersQueried,
|
||||
queryFn: queryFn,
|
||||
stopFn: stopFn,
|
||||
}
|
||||
|
||||
queries[i] = query
|
||||
q := &query{
|
||||
ctx: queryCtx,
|
||||
cancel: cancelQuery,
|
||||
dht: dht,
|
||||
queryPeers: qpeerset.NewQueryPeerset(target),
|
||||
seedPeers: seedPeers,
|
||||
terminated: false,
|
||||
queryFn: queryFn,
|
||||
stopFn: stopFn,
|
||||
}
|
||||
|
||||
// distribute the shuffled K closest peers as seeds among the "d" disjoint queries
|
||||
for i := 0; i < len(seedPeers); i++ {
|
||||
queries[i%d].seedPeers = append(queries[i%d].seedPeers, seedPeers[i])
|
||||
}
|
||||
// run the query
|
||||
q.runWithGreedyParallelism()
|
||||
|
||||
// start the "d" disjoint queries
|
||||
queryDone := make(chan struct{}, d)
|
||||
for i := 0; i < d; i++ {
|
||||
query := queries[i]
|
||||
go func() {
|
||||
query.runWithGreedyParallelism()
|
||||
queryDone <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
// wait for all the "d" disjoint queries to complete before we return
|
||||
// XXX: Waiting until all queries are done is a vector for DoS attacks:
|
||||
// The disjoint lookup paths that are taken over by adversarial peers
|
||||
// can easily be fooled to go on forever.
|
||||
numQueriesComplete := 0
|
||||
for {
|
||||
<-queryDone
|
||||
numQueriesComplete++
|
||||
if numQueriesComplete == d {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
res := dht.constructLookupResult(queries, targetKadID)
|
||||
res := q.constructLookupResult(targetKadID)
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// constructLookupResult takes the query information and uses it to construct the lookup result
|
||||
func (dht *IpfsDHT) constructLookupResult(queries []*query, target kb.ID) *lookupWithFollowupResult {
|
||||
// determine if any queries terminated early
|
||||
func (q *query) constructLookupResult(target kb.ID) *lookupWithFollowupResult {
|
||||
// determine if the query terminated early
|
||||
completed := true
|
||||
for _, q := range queries {
|
||||
if !(q.isLookupTermination() || q.isStarvationTermination()) {
|
||||
completed = false
|
||||
break
|
||||
}
|
||||
|
||||
if !(q.isLookupTermination()) {
|
||||
completed = false
|
||||
}
|
||||
|
||||
// extract the top K not unreachable peers from each query path, as well as their states at the end of the queries
|
||||
// extract the top K not unreachable peers
|
||||
var peers []peer.ID
|
||||
peerState := make(map[peer.ID]qpeerset.PeerState)
|
||||
for _, q := range queries {
|
||||
qp := q.queryPeers.GetClosestNotUnreachable(dht.bucketSize)
|
||||
for _, p := range qp {
|
||||
// Since the same peer can be seen in multiple queries use the "best" state for the peer
|
||||
// Note: It's possible that a peer was marked undialable in one path, but wasn't yet tried in another path
|
||||
// for now we're going to return that peer as long as some path does not think it is undialable. This may
|
||||
// change in the future if we track addresses dialed per path.
|
||||
state := q.queryPeers.GetState(p)
|
||||
if currState, ok := peerState[p]; ok {
|
||||
if state > currState {
|
||||
peerState[p] = state
|
||||
}
|
||||
} else {
|
||||
peerState[p] = state
|
||||
peers = append(peers, p)
|
||||
}
|
||||
}
|
||||
qp := q.queryPeers.GetClosestNotUnreachable(q.dht.bucketSize)
|
||||
for _, p := range qp {
|
||||
state := q.queryPeers.GetState(p)
|
||||
peerState[p] = state
|
||||
peers = append(peers, p)
|
||||
}
|
||||
|
||||
// get the top K overall peers
|
||||
sortedPeers := kb.SortClosestPeers(peers, target)
|
||||
if len(sortedPeers) > dht.bucketSize {
|
||||
sortedPeers = sortedPeers[:dht.bucketSize]
|
||||
if len(sortedPeers) > q.dht.bucketSize {
|
||||
sortedPeers = sortedPeers[:q.dht.bucketSize]
|
||||
}
|
||||
|
||||
// return the top K not unreachable peers across all query paths as well as their states at the end of the queries
|
||||
// return the top K not unreachable peers as well as their states at the end of the query
|
||||
res := &lookupWithFollowupResult{
|
||||
peers: sortedPeers,
|
||||
state: make([]qpeerset.PeerState, len(sortedPeers)),
|
||||
@ -357,13 +300,6 @@ func (q *query) queryPeer(ch chan<- *queryUpdate, p peer.ID) {
|
||||
return
|
||||
}
|
||||
|
||||
// add the peer to the global set of queried peers since the dial was successful
|
||||
// so that no other disjoint query tries sending an RPC to the same peer
|
||||
if !q.globallyQueriedPeers.TryAdd(p) {
|
||||
ch <- &queryUpdate{unreachable: []peer.ID{p}}
|
||||
return
|
||||
}
|
||||
|
||||
// send query RPC to the remote peer
|
||||
newPeers, err := q.queryFn(queryCtx, p)
|
||||
if err != nil {
|
||||
|
@ -332,7 +332,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
|
||||
go func() {
|
||||
defer close(valCh)
|
||||
defer close(lookupResCh)
|
||||
lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, key,
|
||||
lookupRes, err := dht.runLookupWithFollowup(ctx, key,
|
||||
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
|
||||
// For DHT query command
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
@ -580,7 +580,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
|
||||
}
|
||||
}
|
||||
|
||||
lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, string(key),
|
||||
lookupRes, err := dht.runLookupWithFollowup(ctx, string(key),
|
||||
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
|
||||
// For DHT query command
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
@ -656,7 +656,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
|
||||
return pi, nil
|
||||
}
|
||||
|
||||
lookupRes, err := dht.runLookupWithFollowup(ctx, dht.d, string(id),
|
||||
lookupRes, err := dht.runLookupWithFollowup(ctx, string(id),
|
||||
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
|
||||
// For DHT query command
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
|
Loading…
x
Reference in New Issue
Block a user