feat(bootstrap): autobootstrap

1. Auto bootstrap on start.
2. Make `Bootstrap(ctx)` trigger a bootstrap but not _start_ the bootstrapping
   process.
This commit is contained in:
Steven Allen
2019-11-01 00:38:28 -07:00
parent 9c020873ac
commit e2842f0317
2 changed files with 41 additions and 40 deletions

1
dht.go
View File

@@ -109,6 +109,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
h.SetStreamHandler(p, dht.handleNewStream)
}
}
dht.startBootstrapping()
return dht, nil
}

View File

@@ -7,6 +7,8 @@ import (
"sync"
"time"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
@@ -41,52 +43,47 @@ func init() {
}
}
// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
triggerBootstrapFnc := func() {
logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap",
dht.routingTable.Size(), minRTBootstrapThreshold)
if err := dht.selfWalk(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err)
}
if err := dht.bootstrapBuckets(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err)
}
}
// we should query for self periodically so we can discover closer peers
go func() {
for {
err := dht.selfWalk(ctx)
if err != nil {
logger.Warningf("self walk: error: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.SelfQueryInterval):
case <-ctx.Done():
return
}
}
}()
// Bootstrap i
func (dht *IpfsDHT) startBootstrapping() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
go func() {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
scanInterval := time.NewTicker(dht.bootstrapCfg.RoutingTableScanInterval)
defer scanInterval.Stop()
var (
lastSelfWalk time.Time
walkSelf = true
)
for {
if walkSelf {
walkSelf = false
err := dht.selfWalk(ctx)
if err != nil {
logger.Warningf("self walk: error: %s", err)
} else {
lastSelfWalk = time.Now()
}
}
err := dht.bootstrapBuckets(ctx)
if err != nil {
logger.Warningf("bootstrap buckets: error bootstrapping: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval):
case now := <-scanInterval.C:
// It doesn't make sense to query for self unless we're _also_ going to fill out the routing table.
walkSelf = now.After(lastSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
case <-dht.triggerBootstrap:
triggerBootstrapFnc()
walkSelf = true
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
case <-ctx.Done():
return
}
}
}()
})
return nil
}
@@ -166,11 +163,14 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
return err
}
// synchronous bootstrap.
func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error {
if err := dht.selfWalk(ctx); err != nil {
return errors.Wrap(err, "failed bootstrap while searching for self")
} else {
return dht.bootstrapBuckets(ctx)
// Bootstrap tells the DHT to get into a bootstrapped state.
//
// Note: the context is ignored.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
// Returns an error just in case we want to do that in the future.
select {
case dht.triggerBootstrap <- struct{}{}:
default:
}
return nil
}