mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
* feat(query): fully async implementation of Kademlia lookup. peers returned from the lookup are not guaranteed to be alive (i.e. we're only guaranteed to have dialed the closest beta peers to the target), but given stable and correct routing tables the expectation that most of the peers returned are alive is high. * feat(query): add wrapper lookup followup function to followup after the lookup is completed and ensure that the closest k returned peers from a lookup have been queried even for beta < k * refactor(query) modified the structure returned from lookups to be a useful subset of the full query state instead of the entire query state * feat(options): beta parameter exposed as the Resiliency parameter * feat(routing): do not mark the routing table as updated after a FindPeer query * feat(routing): FindPeer can return addresses even if not Connected as long as it was either recently connected (CanConnect) or was discovered during the lookup * feat(bootstrap): bootstrap logic now uses GetClosestPeers instead of FindPeer * refactor(dht): stopFn no longer takes any state * fix(test): changed GetClosestPeers test to only assume beta instead of k peers since that is now more appropriate given the query logic changes and that the routing tables in that test are bad, i.e. a ring network with arbitrary peerIDs Co-authored-by: Petar Maymounkov <petarm@gmail.com> Co-authored-by: Aarsh Shah <aarshkshah1992@gmail.com>
323 lines
9.4 KiB
Go
323 lines
9.4 KiB
Go
package dht
|
|
|
|
import (
|
|
"fmt"
|
|
"time"
|
|
|
|
ds "github.com/ipfs/go-datastore"
|
|
dssync "github.com/ipfs/go-datastore/sync"
|
|
"github.com/libp2p/go-libp2p-core/protocol"
|
|
"github.com/libp2p/go-libp2p-record"
|
|
)
|
|
|
|
// ModeOpt describes what mode the dht should operate in
|
|
type ModeOpt int
|
|
|
|
const (
|
|
// ModeAuto utilizes EvtLocalReachabilityChanged events sent over the event bus to dynamically switch the DHT
|
|
// between Client and Server modes based on network conditions
|
|
ModeAuto ModeOpt = iota
|
|
// ModeClient operates the DHT as a client only, it cannot respond to incoming queries
|
|
ModeClient
|
|
// ModeServer operates the DHT as a server, it can both send and respond to queries
|
|
ModeServer
|
|
)
|
|
|
|
const DefaultPrefix protocol.ID = "/ipfs"
|
|
|
|
// Options is a structure containing all the options that can be used when constructing a DHT.
|
|
type config struct {
|
|
datastore ds.Batching
|
|
validator record.Validator
|
|
mode ModeOpt
|
|
protocolPrefix protocol.ID
|
|
bucketSize int
|
|
disjointPaths int
|
|
concurrency int
|
|
resiliency int
|
|
maxRecordAge time.Duration
|
|
enableProviders bool
|
|
enableValues bool
|
|
|
|
routingTable struct {
|
|
refreshQueryTimeout time.Duration
|
|
refreshPeriod time.Duration
|
|
autoRefresh bool
|
|
latencyTolerance time.Duration
|
|
checkInterval time.Duration
|
|
}
|
|
|
|
// internal parameters, not publicly exposed
|
|
protocols, serverProtocols []protocol.ID
|
|
|
|
// test parameters
|
|
testProtocols []protocol.ID
|
|
}
|
|
|
|
// apply applies the given options to this Option
|
|
func (c *config) apply(opts ...Option) error {
|
|
for i, opt := range opts {
|
|
if err := opt(c); err != nil {
|
|
return fmt.Errorf("dht option %d failed: %s", i, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Option DHT option type.
|
|
type Option func(*config) error
|
|
|
|
const defaultBucketSize = 20
|
|
|
|
// defaults are the default DHT options. This option will be automatically
|
|
// prepended to any options you pass to the DHT constructor.
|
|
var defaults = func(o *config) error {
|
|
o.validator = record.NamespacedValidator{
|
|
"pk": record.PublicKeyValidator{},
|
|
}
|
|
o.datastore = dssync.MutexWrap(ds.NewMapDatastore())
|
|
o.protocolPrefix = DefaultPrefix
|
|
o.enableProviders = true
|
|
o.enableValues = true
|
|
|
|
o.routingTable.latencyTolerance = time.Minute
|
|
o.routingTable.refreshQueryTimeout = 30 * time.Second
|
|
o.routingTable.refreshPeriod = 10 * time.Minute
|
|
o.routingTable.autoRefresh = true
|
|
o.maxRecordAge = time.Hour * 36
|
|
|
|
o.bucketSize = defaultBucketSize
|
|
o.concurrency = 3
|
|
o.resiliency = 3
|
|
|
|
return nil
|
|
}
|
|
|
|
// applyFallbacks sets default DHT options. It is applied after Defaults and any options passed to the constructor in
|
|
// order to allow for defaults that are based on other set options.
|
|
func (c *config) applyFallbacks() error {
|
|
if c.disjointPaths == 0 {
|
|
c.disjointPaths = c.bucketSize / 2
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *config) validate() error {
|
|
if c.protocolPrefix == DefaultPrefix {
|
|
if c.bucketSize != defaultBucketSize {
|
|
return fmt.Errorf("protocol prefix %s must use bucket size %d", DefaultPrefix, defaultBucketSize)
|
|
}
|
|
if !c.enableProviders {
|
|
return fmt.Errorf("protocol prefix %s must have providers enabled", DefaultPrefix)
|
|
}
|
|
if !c.enableValues {
|
|
return fmt.Errorf("protocol prefix %s must have values enabled", DefaultPrefix)
|
|
}
|
|
if nsval, ok := c.validator.(record.NamespacedValidator); !ok {
|
|
return fmt.Errorf("protocol prefix %s must use a namespaced validator", DefaultPrefix)
|
|
} else if len(nsval) > 2 || nsval["pk"] == nil || nsval["ipns"] == nil {
|
|
return fmt.Errorf("protocol prefix %s must support only the /pk and /ipns namespaces", DefaultPrefix)
|
|
}
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RoutingTableCheckInterval is the interval between two runs of the RT cleanup routine.
|
|
func RoutingTableCheckInterval(i time.Duration) Option {
|
|
return func(c *config) error {
|
|
c.routingTable.checkInterval = i
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
|
|
// in the routing table's cluster.
|
|
func RoutingTableLatencyTolerance(latency time.Duration) Option {
|
|
return func(c *config) error {
|
|
c.routingTable.latencyTolerance = latency
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// RoutingTableRefreshQueryTimeout sets the timeout for routing table refresh
|
|
// queries.
|
|
func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
|
|
return func(c *config) error {
|
|
c.routingTable.refreshQueryTimeout = timeout
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// RoutingTableRefreshPeriod sets the period for refreshing buckets in the
|
|
// routing table. The DHT will refresh buckets every period by:
|
|
//
|
|
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
|
|
// 1. Then searching for a random key in each bucket that hasn't been queried in
|
|
// the last refresh period.
|
|
func RoutingTableRefreshPeriod(period time.Duration) Option {
|
|
return func(c *config) error {
|
|
c.routingTable.refreshPeriod = period
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Datastore configures the DHT to use the specified datastore.
|
|
//
|
|
// Defaults to an in-memory (temporary) map.
|
|
func Datastore(ds ds.Batching) Option {
|
|
return func(c *config) error {
|
|
c.datastore = ds
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Mode configures which mode the DHT operates in (Client, Server, Auto).
|
|
//
|
|
// Defaults to ModeAuto.
|
|
func Mode(m ModeOpt) Option {
|
|
return func(c *config) error {
|
|
c.mode = m
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Validator configures the DHT to use the specified validator.
|
|
//
|
|
// Defaults to a namespaced validator that can only validate public keys.
|
|
func Validator(v record.Validator) Option {
|
|
return func(c *config) error {
|
|
c.validator = v
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// NamespacedValidator adds a validator namespaced under `ns`. This option fails
|
|
// if the DHT is not using a `record.NamespacedValidator` as it's validator (it
|
|
// uses one by default but this can be overridden with the `Validator` option).
|
|
//
|
|
// Example: Given a validator registered as `NamespacedValidator("ipns",
|
|
// myValidator)`, all records with keys starting with `/ipns/` will be validated
|
|
// with `myValidator`.
|
|
func NamespacedValidator(ns string, v record.Validator) Option {
|
|
return func(c *config) error {
|
|
nsval, ok := c.validator.(record.NamespacedValidator)
|
|
if !ok {
|
|
return fmt.Errorf("can only add namespaced validators to a NamespacedValidator")
|
|
}
|
|
nsval[ns] = v
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// ProtocolPrefix sets an application specific prefix to be attached to all DHT protocols. For example,
|
|
// /myapp/kad/1.0.0 instead of /ipfs/kad/1.0.0. Prefix should be of the form /myapp.
|
|
//
|
|
// Defaults to dht.DefaultPrefix
|
|
func ProtocolPrefix(prefix protocol.ID) Option {
|
|
return func(c *config) error {
|
|
c.protocolPrefix = prefix
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
|
|
//
|
|
// The default value is 20.
|
|
func BucketSize(bucketSize int) Option {
|
|
return func(c *config) error {
|
|
c.bucketSize = bucketSize
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Concurrency configures the number of concurrent requests (alpha in the Kademlia paper) for a given query path.
|
|
//
|
|
// The default value is 3.
|
|
func Concurrency(alpha int) Option {
|
|
return func(c *config) error {
|
|
c.concurrency = alpha
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Resiliency configures the number of peers closest to a target that must have responded in order for a given query
|
|
// path to complete.
|
|
//
|
|
// The default value is 3.
|
|
func Resiliency(beta int) Option {
|
|
return func(c *config) error {
|
|
c.resiliency = beta
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// DisjointPaths configures the number of disjoint paths (d in the S/Kademlia paper) taken per query.
|
|
//
|
|
// The default value is BucketSize/2.
|
|
func DisjointPaths(d int) Option {
|
|
return func(c *config) error {
|
|
c.disjointPaths = d
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// MaxRecordAge specifies the maximum time that any node will hold onto a record ("PutValue record")
|
|
// from the time its received. This does not apply to any other forms of validity that
|
|
// the record may contain.
|
|
// For example, a record may contain an ipns entry with an EOL saying its valid
|
|
// until the year 2020 (a great time in the future). For that record to stick around
|
|
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
|
|
func MaxRecordAge(maxAge time.Duration) Option {
|
|
return func(c *config) error {
|
|
c.maxRecordAge = maxAge
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// DisableAutoRefresh completely disables 'auto-refresh' on the DHT routing
|
|
// table. This means that we will neither refresh the routing table periodically
|
|
// nor when the routing table size goes below the minimum threshold.
|
|
func DisableAutoRefresh() Option {
|
|
return func(c *config) error {
|
|
c.routingTable.autoRefresh = false
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// DisableProviders disables storing and retrieving provider records.
|
|
//
|
|
// Defaults to enabled.
|
|
//
|
|
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
|
|
// network and/or distinct DHT protocols with the `Protocols` option).
|
|
func DisableProviders() Option {
|
|
return func(c *config) error {
|
|
c.enableProviders = false
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// DisableProviders disables storing and retrieving value records (including
|
|
// public keys).
|
|
//
|
|
// Defaults to enabled.
|
|
//
|
|
// WARNING: do not change this unless you're using a forked DHT (i.e., a private
|
|
// network and/or distinct DHT protocols with the `Protocols` option).
|
|
func DisableValues() Option {
|
|
return func(c *config) error {
|
|
c.enableValues = false
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// customProtocols is only to be used for testing. It sets the protocols that the DHT listens on and queries with to be
|
|
// the ones passed in. The custom protocols are still augmented by the Prefix.
|
|
func customProtocols(protos ...protocol.ID) Option {
|
|
return func(c *config) error {
|
|
c.testProtocols = protos
|
|
return nil
|
|
}
|
|
}
|