mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
refactor(dht): stopFn no longer takes any state. Beta publicly exposed as the Resiliency parameter.
This commit is contained in:
parent
49b07fa585
commit
516cd0b49d
2
dht.go
2
dht.go
@ -92,6 +92,7 @@ type IpfsDHT struct {
|
||||
|
||||
bucketSize int
|
||||
alpha int // The concurrency parameter per path
|
||||
beta int // The number of peers closest to a target that must have responded for a query path to terminate
|
||||
d int // Number of Disjoint Paths to query
|
||||
|
||||
autoRefresh bool
|
||||
@ -234,6 +235,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
|
||||
serverProtocols: serverProtocols,
|
||||
bucketSize: cfg.bucketSize,
|
||||
alpha: cfg.concurrency,
|
||||
beta: cfg.resiliency,
|
||||
d: cfg.disjointPaths,
|
||||
triggerRtRefresh: make(chan chan<- error),
|
||||
triggerSelfLookup: make(chan chan<- error),
|
||||
|
@ -34,6 +34,7 @@ type config struct {
|
||||
bucketSize int
|
||||
disjointPaths int
|
||||
concurrency int
|
||||
resiliency int
|
||||
maxRecordAge time.Duration
|
||||
enableProviders bool
|
||||
enableValues bool
|
||||
@ -87,6 +88,7 @@ var defaults = func(o *config) error {
|
||||
|
||||
o.bucketSize = defaultBucketSize
|
||||
o.concurrency = 3
|
||||
o.resiliency = 3
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -239,6 +241,17 @@ func Concurrency(alpha int) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Resiliency configures the number of peers closest to a target that must have responded in order for a given query
|
||||
// path to complete.
|
||||
//
|
||||
// The default value is 3.
|
||||
func Resiliency(beta int) Option {
|
||||
return func(c *config) error {
|
||||
c.resiliency = beta
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// DisjointPaths configures the number of disjoint paths (d in the S/Kademlia paper) taken per query.
|
||||
//
|
||||
// The default value is BucketSize/2.
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
|
||||
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
|
||||
kb "github.com/libp2p/go-libp2p-kbucket"
|
||||
"github.com/multiformats/go-base32"
|
||||
@ -100,7 +99,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
|
||||
|
||||
return peers, err
|
||||
},
|
||||
func(peerset *kpeerset.SortedPeerset) bool { return false },
|
||||
func() bool { return false },
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
|
9
query.go
9
query.go
@ -17,7 +17,7 @@ import (
|
||||
var ErrNoPeersQueried = errors.New("failed to query any peers")
|
||||
|
||||
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
|
||||
type stopFn func(*qpeerset.QueryPeerset) bool
|
||||
type stopFn func() bool
|
||||
|
||||
// query represents a single disjoint query.
|
||||
type query struct {
|
||||
@ -182,7 +182,7 @@ func (q *query) readyToTerminate() bool {
|
||||
return true
|
||||
}
|
||||
// give the application logic a chance to terminate
|
||||
if q.stopFn(q.queryPeers) {
|
||||
if q.stopFn() {
|
||||
return true
|
||||
}
|
||||
if q.isStarvationTermination() {
|
||||
@ -194,14 +194,11 @@ func (q *query) readyToTerminate() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// Beta paramerizes the Kademlia terminatioin condition.
|
||||
var Beta = 3
|
||||
|
||||
// From the set of all nodes that are not unreachable,
|
||||
// if the closest beta nodes are all queried, the lookup can terminate.
|
||||
func (q *query) isLookupTermination() bool {
|
||||
var peers []peer.ID
|
||||
peers = q.queryPeers.GetClosestNotUnreachable(Beta)
|
||||
peers = q.queryPeers.GetClosestNotUnreachable(q.dht.beta)
|
||||
for _, p := range peers {
|
||||
if !q.queryPeers.IsQueried(p) {
|
||||
return false
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"github.com/ipfs/go-cid"
|
||||
u "github.com/ipfs/go-ipfs-util"
|
||||
logging "github.com/ipfs/go-log"
|
||||
"github.com/libp2p/go-libp2p-kad-dht/kpeerset"
|
||||
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
|
||||
kb "github.com/libp2p/go-libp2p-kbucket"
|
||||
record "github.com/libp2p/go-libp2p-record"
|
||||
@ -386,7 +385,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
|
||||
|
||||
return peers, err
|
||||
},
|
||||
func(peerset *kpeerset.SortedPeerset) bool {
|
||||
func() bool {
|
||||
select {
|
||||
case <-stopQuery:
|
||||
return true
|
||||
@ -412,7 +411,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
|
||||
func (dht *IpfsDHT) refreshRTIfNoShortcut(key kb.ID, queries []*query) {
|
||||
shortcutTaken := false
|
||||
for _, q := range queries {
|
||||
if q.localPeers.LenUnqueriedFromKClosest() > 0 {
|
||||
if len(q.queryPeers.GetClosestNotUnreachable(3)) > 0 {
|
||||
shortcutTaken = true
|
||||
break
|
||||
}
|
||||
@ -644,7 +643,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
|
||||
|
||||
return peers, nil
|
||||
},
|
||||
func(peerset *kpeerset.SortedPeerset) bool {
|
||||
func() bool {
|
||||
return !findAll && ps.Size() >= count
|
||||
},
|
||||
)
|
||||
@ -693,7 +692,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
|
||||
|
||||
return peers, err
|
||||
},
|
||||
func(peerset *kpeerset.SortedPeerset) bool {
|
||||
func() bool {
|
||||
return dht.host.Network().Connectedness(id) == network.Connected
|
||||
},
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user