Merge pull request #405 from libp2p/feat/simplify-bootstrapping

fix and simplify some bootstrapping logic
This commit is contained in:
Steven Allen 2019-11-06 00:36:08 +00:00 committed by GitHub
commit 8ecf9380a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 127 additions and 158 deletions

18
dht.go
View File

@ -67,11 +67,10 @@ type IpfsDHT struct {
bucketSize int
bootstrapCfg opts.BootstrapConfig
triggerAutoBootstrap bool
triggerBootstrap chan struct{}
latestSelfWalk time.Time // the last time we looked-up our own peerID in the network
autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
triggerRtRefresh chan struct{}
}
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
@ -92,7 +91,9 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
return nil, err
}
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize)
dht.bootstrapCfg = cfg.BootstrapConfig
dht.autoRefresh = cfg.RoutingTable.AutoRefresh
dht.rtRefreshPeriod = cfg.RoutingTable.RefreshPeriod
dht.rtRefreshQueryTimeout = cfg.RoutingTable.RefreshQueryTimeout
// register for network notifs.
dht.host.Network().Notify((*netNotifiee)(dht))
@ -105,14 +106,13 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap
if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
}
}
dht.startBootstrapping()
dht.startRefreshing()
return dht, nil
}
@ -163,7 +163,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerBootstrap: make(chan struct{}),
triggerRtRefresh: make(chan struct{}),
}
dht.ctx = dht.newContextWithLocalTags(ctx)

View File

@ -2,9 +2,6 @@ package dht
import (
"context"
"fmt"
"strings"
"sync"
"time"
process "github.com/jbenet/goprocess"
@ -12,12 +9,13 @@ import (
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
"github.com/pkg/errors"
)
var DefaultBootstrapPeers []multiaddr.Multiaddr
var minRTBootstrapThreshold = 4
// Minimum number of peers in the routing table. If we drop below this and we
// see a new peer, we trigger a bootstrap round.
var minRTRefreshThreshold = 4
func init() {
for _, s := range []string{
@ -43,71 +41,53 @@ func init() {
}
}
// Start the bootstrap worker.
func (dht *IpfsDHT) startBootstrapping() error {
// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
defer scanInterval.Stop()
refreshTicker := time.NewTicker(dht.rtRefreshPeriod)
defer refreshTicker.Stop()
// run bootstrap if option is set
if dht.triggerAutoBootstrap {
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warningf("bootstrap error: %s", err)
}
// refresh if option is set
if dht.autoRefresh {
dht.doRefresh(ctx)
} else {
// disable the "auto-bootstrap" ticker so that no more ticks are sent to this channel
scanInterval.Stop()
// disable the "auto-refresh" ticker so that no more ticks are sent to this channel
refreshTicker.Stop()
}
for {
select {
case now := <-scanInterval.C:
walkSelf := now.After(dht.latestSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
if err := dht.doBootstrap(ctx, walkSelf); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-dht.triggerBootstrap:
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
if err := dht.doBootstrap(ctx, true); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case <-refreshTicker.C:
case <-dht.triggerRtRefresh:
logger.Infof("triggering a refresh: RT has %d peers", dht.routingTable.Size())
case <-ctx.Done():
return
}
dht.doRefresh(ctx)
}
})
return nil
}
func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool) error {
if walkSelf {
if err := dht.selfWalk(ctx); err != nil {
return fmt.Errorf("self walk: error: %s", err)
}
dht.latestSelfWalk = time.Now()
func (dht *IpfsDHT) doRefresh(ctx context.Context) {
dht.selfWalk(ctx)
dht.refreshBuckets(ctx)
}
if err := dht.bootstrapBuckets(ctx); err != nil {
return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
}
return nil
}
// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)",
logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
defer func() {
logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)",
logger.Infof("finished refreshing bucket %d to %s (routing table size is now %d)",
bucketId, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
err := f(queryCtx)
if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
@ -117,20 +97,22 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
}
buckets := dht.routingTable.GetAllBuckets()
var wg sync.WaitGroup
errChan := make(chan error)
if len(buckets) > 16 {
// Don't bother bootstrapping more than 16 buckets.
// GenRandPeerID can't generate target peer IDs with more than
// 16 bits specified anyways.
buckets = buckets[:16]
}
for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod {
wg.Add(1)
go func(bucketID int, errChan chan<- error) {
defer wg.Done()
if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod {
continue
}
// gen rand peer in the bucket
randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID)
// walk to the generated peer
walkFnc := func(c context.Context) error {
_, err := dht.FindPeer(ctx, randPeerInBucket)
_, err := dht.FindPeer(c, randPeerInBucket)
if err == routing.ErrNotFound {
return nil
}
@ -138,48 +120,35 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error {
}
if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID)
logger.Warningf("failed to do a random walk on bucket %d: %s", bucketID, err)
}
}(bucketID, errChan)
}
}
// wait for all walks to finish & close the error channel
go func() {
wg.Wait()
close(errChan)
}()
// accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure.
var errStrings []string
for err := range errChan {
errStrings = append(errStrings, err.Error())
}
if len(errStrings) == 0 {
return nil
} else {
return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n"))
}
}
// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout)
func (dht *IpfsDHT) selfWalk(ctx context.Context) {
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
return
}
return err
logger.Warningf("failed to query self during routing table refresh: %s", err)
}
// Bootstrap tells the DHT to get into a bootstrapped state.
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
//
// Note: the context is ignored.
// This just calls `RefreshRoutingTable`.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
select {
case dht.triggerBootstrap <- struct{}{}:
default:
}
dht.RefreshRoutingTable()
return nil
}
// RefreshRoutingTable tells the DHT to refresh it's routing tables.
func (dht *IpfsDHT) RefreshRoutingTable() {
select {
case dht.triggerRtRefresh <- struct{}{}:
default:
}
}

View File

@ -113,7 +113,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(client),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
opts.DisableAutoRefresh(),
)
if err != nil {
t.Fatal(err)
@ -191,7 +191,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
logger.Debugf("Bootstrapping DHTs...")
logger.Debugf("refreshing DHTs routing tables...")
// tried async. sequential fares much better. compare:
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
@ -201,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.Bootstrap(ctx)
dht.RefreshRoutingTable()
}
}
@ -639,7 +639,7 @@ func printRoutingTables(dhts []*IpfsDHT) {
}
}
func TestBootstrap(t *testing.T) {
func TestRefresh(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
@ -689,7 +689,7 @@ func TestBootstrap(t *testing.T) {
}
}
func TestBootstrapBelowMinRTThreshold(t *testing.T) {
func TestRefreshBelowMinRTThreshold(t *testing.T) {
ctx := context.Background()
// enable auto bootstrap on A
@ -721,7 +721,7 @@ func TestBootstrapBelowMinRTThreshold(t *testing.T) {
connect(t, ctx, dhtB, dhtC)
// we ONLY init bootstrap on A
dhtA.Bootstrap(ctx)
dhtA.RefreshRoutingTable()
// and wait for one round to complete i.e. A should be connected to both B & C
waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second)
@ -749,7 +749,7 @@ func TestBootstrapBelowMinRTThreshold(t *testing.T) {
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
}
func TestPeriodicBootstrap(t *testing.T) {
func TestPeriodicRefresh(t *testing.T) {
if ci.IsRunning() {
t.Skip("skipping on CI. highly timing dependent")
}
@ -795,7 +795,7 @@ func TestPeriodicBootstrap(t *testing.T) {
t.Logf("bootstrapping them so they find each other. %d", nDHTs)
for _, dht := range dhts {
go dht.Bootstrap(ctx)
dht.RefreshRoutingTable()
}
// this is async, and we dont know when it's finished with one cycle, so keep checking
@ -1428,7 +1428,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
opts.DisableAutoRefresh(),
}
dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
@ -1467,7 +1467,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
opts.DisableAutoRefresh(),
}...)
if err != nil {
t.Fatal(err)
@ -1477,7 +1477,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/lsr/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
opts.DisableAutoRefresh(),
}...)
if err != nil {
t.Fatal(err)

View File

@ -30,7 +30,7 @@ func TestGetFailures(t *testing.T) {
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoBootstrap()}
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
@ -152,7 +152,7 @@ func TestNotFound(t *testing.T) {
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoBootstrap()}
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
@ -232,7 +232,7 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoBootstrap()}
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
@ -302,7 +302,7 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
os := []opts.Option{opts.DisableAutoBootstrap()}
os := []opts.Option{opts.DisableAutoRefresh()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)

2
go.mod
View File

@ -4,7 +4,6 @@ go 1.12
require (
github.com/gogo/protobuf v1.3.0
github.com/google/uuid v1.1.1
github.com/hashicorp/golang-lru v0.5.3
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-datastore v0.1.0
@ -25,7 +24,6 @@ require (
github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.3
github.com/multiformats/go-multistream v0.1.0
github.com/pkg/errors v0.8.1
github.com/stretchr/testify v1.3.0
github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc
go.opencensus.io v0.22.1

2
go.sum
View File

@ -395,8 +395,6 @@ golang.org/x/tools v0.0.0-20181130052023-1c3d964395ce/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=

View File

@ -32,11 +32,11 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
dht.plk.Lock()
defer dht.plk.Unlock()
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
refresh := dht.routingTable.Size() <= minRTRefreshThreshold
dht.Update(dht.Context(), p)
if bootstrap && dht.triggerAutoBootstrap {
if refresh && dht.autoRefresh {
select {
case dht.triggerBootstrap <- struct{}{}:
case dht.triggerRtRefresh <- struct{}{}:
default:
}
}
@ -78,11 +78,11 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
dht.plk.Lock()
defer dht.plk.Unlock()
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
refresh := dht.routingTable.Size() <= minRTRefreshThreshold
dht.Update(dht.Context(), p)
if bootstrap && dht.triggerAutoBootstrap {
if refresh && dht.autoRefresh {
select {
case dht.triggerBootstrap <- struct{}{}:
case dht.triggerRtRefresh <- struct{}{}:
default:
}
}

View File

@ -19,13 +19,6 @@ var (
DefaultProtocols = []protocol.ID{ProtocolDHT}
)
// BootstrapConfig specifies parameters used for bootstrapping the DHT.
type BootstrapConfig struct {
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
SelfQueryInterval time.Duration // how often to query for self
}
// Options is a structure containing all the options that can be used when constructing a DHT.
type Options struct {
Datastore ds.Batching
@ -33,8 +26,12 @@ type Options struct {
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
TriggerAutoBootstrap bool
RoutingTable struct {
RefreshQueryTimeout time.Duration
RefreshPeriod time.Duration
AutoRefresh bool
}
}
// Apply applies the given options to this Option
@ -59,24 +56,31 @@ var Defaults = func(o *Options) error {
o.Datastore = dssync.MutexWrap(ds.NewMapDatastore())
o.Protocols = DefaultProtocols
o.BootstrapConfig = BootstrapConfig{
// same as that mentioned in the kad dht paper
BucketPeriod: 1 * time.Hour,
Timeout: 10 * time.Second,
SelfQueryInterval: 1 * time.Hour,
}
o.TriggerAutoBootstrap = true
o.RoutingTable.RefreshQueryTimeout = 10 * time.Second
o.RoutingTable.RefreshPeriod = 1 * time.Hour
o.RoutingTable.AutoRefresh = true
return nil
}
// Bootstrap configures the dht bootstrapping process
func Bootstrap(b BootstrapConfig) Option {
// RoutingTableRefreshQueryTimeout sets the timeout for routing table refresh
// queries.
func RoutingTableRefreshQueryTimeout(timeout time.Duration) Option {
return func(o *Options) error {
o.BootstrapConfig = b
o.RoutingTable.RefreshQueryTimeout = timeout
return nil
}
}
// RoutingTableRefreshPeriod sets the period for refreshing buckets in the
// routing table. The DHT will refresh buckets every period by:
//
// 1. First searching for nearby peers to figure out how many buckets we should try to fill.
// 1. Then searching for a random key in each bucket that hasn't been queried in
// the last refresh period.
func RoutingTableRefreshPeriod(period time.Duration) Option {
return func(o *Options) error {
o.RoutingTable.RefreshPeriod = period
return nil
}
}
@ -149,12 +153,12 @@ func BucketSize(bucketSize int) Option {
}
}
// DisableAutoBootstrap completely disables 'auto-bootstrap' on the Dht
// This means that neither will we do periodic bootstrap nor will we
// bootstrap the Dht even if the Routing Table size goes below the minimum threshold
func DisableAutoBootstrap() Option {
// DisableAutoRefresh completely disables 'auto-refresh' on the DHT routing
// table. This means that we will neither refresh the routing table periodically
// nor when the routing table size goes below the minimum threshold.
func DisableAutoRefresh() Option {
return func(o *Options) error {
o.TriggerAutoBootstrap = false
o.RoutingTable.AutoRefresh = false
return nil
}
}