mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
feat(bootstrap): simplify bootstrapping
* Rename triggerAutoBootstrap to autoBootstrap. This variable used to control _triggering_ only but now completely disables automatic bootstrapping. * Remove the BootstrapConfig. We introduced this before we switched to functional options. Now that we're breaking the interfaces anyways, we might as well use functional options all the way (easier to extend). * Always query self (feedback from @raulk). * Important: don't abort the bootstrap process if we timeout finding ourselves.
This commit is contained in:
parent
4fd64988b5
commit
ed244cd485
14
dht.go
14
dht.go
@ -67,11 +67,10 @@ type IpfsDHT struct {
|
||||
|
||||
bucketSize int
|
||||
|
||||
bootstrapCfg opts.BootstrapConfig
|
||||
|
||||
triggerAutoBootstrap bool
|
||||
triggerBootstrap chan struct{}
|
||||
latestSelfWalk time.Time // the last time we looked-up our own peerID in the network
|
||||
autoBootstrap bool
|
||||
bootstrapTimeout time.Duration
|
||||
bootstrapPeriod time.Duration
|
||||
triggerBootstrap chan struct{}
|
||||
}
|
||||
|
||||
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
|
||||
@ -92,7 +91,9 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
|
||||
return nil, err
|
||||
}
|
||||
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
|
||||
dht.bootstrapCfg = cfg.BootstrapConfig
|
||||
dht.autoBootstrap = cfg.AutoBootstrap
|
||||
dht.bootstrapPeriod = cfg.BootstrapPeriod
|
||||
dht.bootstrapTimeout = cfg.BootstrapTimeout
|
||||
|
||||
// register for network notifs.
|
||||
dht.host.Network().Notify((*netNotifiee)(dht))
|
||||
@ -105,7 +106,6 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
|
||||
|
||||
dht.proc.AddChild(dht.providers.Process())
|
||||
dht.Validator = cfg.Validator
|
||||
dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap
|
||||
|
||||
if !cfg.Client {
|
||||
for _, p := range cfg.Protocols {
|
||||
|
@ -49,14 +49,12 @@ func (dht *IpfsDHT) startBootstrapping() error {
|
||||
dht.proc.Go(func(proc process.Process) {
|
||||
ctx := processctx.OnClosingContext(proc)
|
||||
|
||||
scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
|
||||
scanInterval := time.NewTicker(dht.bootstrapPeriod)
|
||||
defer scanInterval.Stop()
|
||||
|
||||
// run bootstrap if option is set
|
||||
if dht.triggerAutoBootstrap {
|
||||
if err := dht.doBootstrap(ctx, true); err != nil {
|
||||
logger.Warningf("bootstrap error: %s", err)
|
||||
}
|
||||
if dht.autoBootstrap {
|
||||
dht.doBootstrap(ctx)
|
||||
} else {
|
||||
// disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel
|
||||
scanInterval.Stop()
|
||||
@ -64,38 +62,26 @@ func (dht *IpfsDHT) startBootstrapping() error {
|
||||
|
||||
for {
|
||||
select {
|
||||
case now := <-scanInterval.C:
|
||||
walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
|
||||
if err := dht.doBootstrap(ctx, walkSelf); err != nil {
|
||||
logger.Warning("bootstrap error: %s", err)
|
||||
}
|
||||
case <-scanInterval.C:
|
||||
case <-dht.triggerBootstrap:
|
||||
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
|
||||
if err := dht.doBootstrap(ctx, true); err != nil {
|
||||
logger.Warning("bootstrap error: %s", err)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
dht.doBootstrap(ctx)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error {
|
||||
if walkSelf {
|
||||
if err := dht.selfWalk(ctx); err != nil {
|
||||
return fmt.Errorf("self walk: error: %s", err)
|
||||
}
|
||||
dht.latestSelfWalk = time.Now()
|
||||
func (dht *IpfsDHT) doBootstrap(ctx context.Context) {
|
||||
if err := dht.selfWalk(ctx); err != nil {
|
||||
logger.Warningf("error while bootstrapping self: %s", err)
|
||||
}
|
||||
|
||||
if err := dht.bootstrapBuckets(ctx); err != nil {
|
||||
return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
|
||||
logger.Warningf("error while bootstrapping buckets: %s", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
|
||||
@ -107,7 +93,7 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
|
||||
logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
|
||||
bucketId, target, dht.routingTable.Size())
|
||||
}()
|
||||
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
|
||||
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapTimeout)
|
||||
defer cancel()
|
||||
err := f(queryCtx)
|
||||
if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
|
||||
@ -121,7 +107,7 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
|
||||
errChan := make(chan error)
|
||||
|
||||
for bucketID, bucket := range buckets {
|
||||
if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod {
|
||||
if time.Since(bucket.RefreshedAt()) > dht.bootstrapPeriod {
|
||||
wg.Add(1)
|
||||
go func(bucketID int, errChan chan<- error) {
|
||||
defer wg.Done()
|
||||
@ -164,7 +150,7 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
|
||||
|
||||
// Traverse the DHT toward the self ID
|
||||
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
|
||||
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
|
||||
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapTimeout)
|
||||
defer cancel()
|
||||
_, err := dht.FindPeer(queryCtx, dht.self)
|
||||
if err == routing.ErrNotFound {
|
||||
|
4
notif.go
4
notif.go
@ -34,7 +34,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
|
||||
if dht.host.Network().Connectedness(p) == network.Connected {
|
||||
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
|
||||
dht.Update(dht.Context(), p)
|
||||
if bootstrap && dht.triggerAutoBootstrap {
|
||||
if bootstrap && dht.autoBootstrap {
|
||||
select {
|
||||
case dht.triggerBootstrap <- struct{}{}:
|
||||
default:
|
||||
@ -80,7 +80,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
|
||||
if dht.host.Network().Connectedness(p) == network.Connected {
|
||||
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
|
||||
dht.Update(dht.Context(), p)
|
||||
if bootstrap && dht.triggerAutoBootstrap {
|
||||
if bootstrap && dht.autoBootstrap {
|
||||
select {
|
||||
case dht.triggerBootstrap <- struct{}{}:
|
||||
default:
|
||||
|
@ -19,22 +19,17 @@ var (
|
||||
DefaultProtocols = []protocol.ID{ProtocolDHT}
|
||||
)
|
||||
|
||||
// BootstrapConfig specifies parameters used for bootstrapping the DHT.
|
||||
type BootstrapConfig struct {
|
||||
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
|
||||
Timeout time.Duration // how long to wait for a bootstrap query to run
|
||||
SelfQueryInterval time.Duration // how often to query for self
|
||||
}
|
||||
|
||||
// Options is a structure containing all the options that can be used when constructing a DHT.
|
||||
type Options struct {
|
||||
Datastore ds.Batching
|
||||
Validator record.Validator
|
||||
Client bool
|
||||
Protocols []protocol.ID
|
||||
BucketSize int
|
||||
BootstrapConfig BootstrapConfig
|
||||
TriggerAutoBootstrap bool
|
||||
Datastore ds.Batching
|
||||
Validator record.Validator
|
||||
Client bool
|
||||
Protocols []protocol.ID
|
||||
BucketSize int
|
||||
|
||||
BootstrapTimeout time.Duration
|
||||
BootstrapPeriod time.Duration
|
||||
AutoBootstrap bool
|
||||
}
|
||||
|
||||
// Apply applies the given options to this Option
|
||||
@ -59,24 +54,30 @@ var Defaults = func(o *Options) error {
|
||||
o.Datastore = dssync.MutexWrap(ds.NewMapDatastore())
|
||||
o.Protocols = DefaultProtocols
|
||||
|
||||
o.BootstrapConfig = BootstrapConfig{
|
||||
// same as that mentioned in the kad dht paper
|
||||
BucketPeriod: 1 * time.Hour,
|
||||
|
||||
Timeout: 10 * time.Second,
|
||||
|
||||
SelfQueryInterval: 1 * time.Hour,
|
||||
}
|
||||
|
||||
o.TriggerAutoBootstrap = true
|
||||
o.BootstrapTimeout = 10 * time.Second
|
||||
o.BootstrapPeriod = 1 * time.Hour
|
||||
o.AutoBootstrap = true
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Bootstrap configures the dht bootstrapping process
|
||||
func Bootstrap(b BootstrapConfig) Option {
|
||||
// BootstrapTimeout sets the timeout for bootstrap queries.
|
||||
func BootstrapTimeout(timeout time.Duration) Option {
|
||||
return func(o *Options) error {
|
||||
o.BootstrapConfig = b
|
||||
o.BootstrapTimeout = timeout
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// BootstrapPeriod sets the period for bootstrapping. The DHT will bootstrap
|
||||
// every bootstrap period by:
|
||||
//
|
||||
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
|
||||
// 1. Then searching for a random key in each bucket that hasn't been queried in
|
||||
// the last bootstrap period.
|
||||
func BootstrapPeriod(period time.Duration) Option {
|
||||
return func(o *Options) error {
|
||||
o.BootstrapPeriod = period
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -154,7 +155,7 @@ func BucketSize(bucketSize int) Option {
|
||||
// bootstrap the Dht even if the Routing Table size goes below the minimum threshold
|
||||
func DisableAutoBootstrap() Option {
|
||||
return func(o *Options) error {
|
||||
o.TriggerAutoBootstrap = false
|
||||
o.AutoBootstrap = false
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user