changed bootstrapping logic

This commit is contained in:
Aarsh Shah 2019-08-18 22:21:15 +08:00 committed by Steven Allen
parent fed99afe6e
commit d53dfd6a86
2 changed files with 94 additions and 48 deletions

View File

@ -4,14 +4,16 @@ import (
"context" "context"
"crypto/rand" "crypto/rand"
"fmt" "fmt"
"strings"
"sync"
"time" "time"
u "github.com/ipfs/go-ipfs-util"
"github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
u "github.com/ipfs/go-ipfs-util"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns" _ "github.com/multiformats/go-multiaddr-dns"
"github.com/pkg/errors"
) )
var DefaultBootstrapPeers []multiaddr.Multiaddr var DefaultBootstrapPeers []multiaddr.Multiaddr
@ -40,30 +42,24 @@ func init() {
} }
} }
// BootstrapConfig specifies parameters used bootstrapping the DHT. // BootstrapConfig specifies parameters used for bootstrapping the DHT.
//
// Note there is a tradeoff between the bootstrap period and the
// number of queries. We could support a higher period with less
// queries.
type BootstrapConfig struct { type BootstrapConfig struct {
Queries int // how many queries to run per period BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Period time.Duration // how often to run periodic bootstrap. Timeout time.Duration // how long to wait for a bootstrap query to run
Timeout time.Duration // how long to wait for a bootstrap query to run RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period
SelfQueryInterval time.Duration // how often to query for self
} }
var DefaultBootstrapConfig = BootstrapConfig{ var DefaultBootstrapConfig = BootstrapConfig{
// For now, this is set to 1 query. // same as that mentioned in the kad dht paper
// We are currently more interested in ensuring we have a properly formed BucketPeriod: 1 * time.Hour,
// DHT than making sure our dht minimizes traffic. Once we are more certain
// of our implementation's robustness, we should lower this down to 8 or 4.
Queries: 1,
// For now, this is set to 5 minutes, which is a medium period. We are // since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable
// We are currently more interested in ensuring we have a properly formed RoutingTableScanInterval: 30 * time.Minute,
// DHT than making sure our dht minimizes traffic.
Period: time.Duration(5 * time.Minute),
Timeout: time.Duration(10 * time.Second), Timeout: 10 * time.Second,
SelfQueryInterval: 1 * time.Hour,
} }
// A method in the IpfsRouting interface. It calls BootstrapWithConfig with // A method in the IpfsRouting interface. It calls BootstrapWithConfig with
@ -72,13 +68,24 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig) return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig)
} }
// Runs cfg.Queries bootstrap queries every cfg.Period. // Runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error { func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
// Because this method is not synchronous, we have to duplicate sanity // we should query for self periodically so we can discover closer peers
// checks on the config so that callers aren't oblivious. go func() {
if cfg.Queries <= 0 { for {
return fmt.Errorf("invalid number of queries: %d", cfg.Queries) err := dht.BootstrapSelf(ctx)
} if err != nil {
logger.Warningf("error bootstrapping while querying for self: %s", err)
}
select {
case <-time.After(cfg.SelfQueryInterval):
case <-ctx.Done():
return
}
}
}()
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
go func() { go func() {
for { for {
err := dht.runBootstrap(ctx, cfg) err := dht.runBootstrap(ctx, cfg)
@ -86,7 +93,7 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig
logger.Warningf("error bootstrapping: %s", err) logger.Warningf("error bootstrapping: %s", err)
} }
select { select {
case <-time.After(cfg.Period): case <-time.After(cfg.RoutingTableScanInterval):
case <-ctx.Done(): case <-ctx.Done():
return return
} }
@ -95,15 +102,6 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig
return nil return nil
} }
// This is a synchronous bootstrap. cfg.Queries queries will run each with a
// timeout of cfg.Timeout. cfg.Period is not used.
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
if cfg.Queries <= 0 {
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
}
return dht.runBootstrap(ctx, cfg)
}
func newRandomPeerId() peer.ID { func newRandomPeerId() peer.ID {
id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs. id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
rand.Read(id) rand.Read(id)
@ -145,14 +143,14 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
return err return err
} }
// runBootstrap builds up list of peers by requesting random peer IDs //scan the RT,& do a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
doQuery := func(n int, target string, f func(context.Context) error) error { doQuery := func(n int, target string, f func(context.Context) error) error {
logger.Infof("starting bootstrap query (%d/%d) to %s (routing table size was %d)", logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
n, cfg.Queries, target, dht.routingTable.Size()) n, target, dht.routingTable.Size())
defer func() { defer func() {
logger.Infof("finished bootstrap query (%d/%d) to %s (routing table size is now %d)", logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
n, cfg.Queries, target, dht.routingTable.Size()) n, target, dht.routingTable.Size())
}() }()
queryCtx, cancel := context.WithTimeout(ctx, cfg.Timeout) queryCtx, cancel := context.WithTimeout(ctx, cfg.Timeout)
defer cancel() defer cancel()
@ -163,16 +161,63 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
return err return err
} }
// Do all but one of the bootstrap queries as random walks. buckets := dht.routingTable.GetAllBuckets()
for i := 0; i < cfg.Queries; i++ { var wg sync.WaitGroup
err := doQuery(i, "random ID", dht.randomWalk) errChan := make(chan error)
if err != nil {
return err for bucketID, bucket := range buckets {
if time.Since(bucket.LastQueriedAt()) > cfg.BucketPeriod {
wg.Add(1)
go func(bucketID int, errChan chan<- error) {
defer wg.Done()
// gen rand peer in the bucket
randPeerInBucket, err := dht.routingTable.GenRandPeerID(bucketID)
if err != nil {
errChan <- errors.Wrapf(err, "failed to generate random peer ID in bucket %d", bucketID)
return
}
// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.walk(ctx, randPeerInBucket)
if err == routing.ErrNotFound {
return nil
}
return err
}
if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID)
}
}(bucketID, errChan)
} }
} }
// Find self to distribute peer info to our neighbors. // wait for all walks to finish & close the error channel
return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), dht.selfWalk) go func() {
wg.Wait()
close(errChan)
}()
// accumulate errors from all go-routines
var errStrings []string
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
} else {
return fmt.Errorf("errors encountered while running bootstrap on RT: %s", strings.Join(errStrings, "\n"))
}
}
// This is a synchronous bootstrap.
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
if err := dht.BootstrapSelf(ctx); err != nil {
return errors.Wrap(err, "failed bootstrap while searching for self")
} else {
return dht.runBootstrap(ctx, cfg)
}
} }
func (dht *IpfsDHT) BootstrapRandom(ctx context.Context) error { func (dht *IpfsDHT) BootstrapRandom(ctx context.Context) error {

1
go.mod
View File

@ -24,6 +24,7 @@ require (
github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.3 github.com/multiformats/go-multiaddr-dns v0.0.3
github.com/multiformats/go-multistream v0.1.0 github.com/multiformats/go-multistream v0.1.0
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0 github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
go.opencensus.io v0.22.1 go.opencensus.io v0.22.1