From d53dfd6a86342f5ee6a0c1c8b07853c99f415e08 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Sun, 18 Aug 2019 22:21:15 +0800 Subject: [PATCH 1/9] changed bootstrapping logic --- dht_bootstrap.go | 141 +++++++++++++++++++++++++++++++---------------- go.mod | 1 + 2 files changed, 94 insertions(+), 48 deletions(-) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 0dd1884..325caa5 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -4,14 +4,16 @@ import ( "context" "crypto/rand" "fmt" + "strings" + "sync" "time" + u "github.com/ipfs/go-ipfs-util" "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" - - u "github.com/ipfs/go-ipfs-util" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" + "github.com/pkg/errors" ) var DefaultBootstrapPeers []multiaddr.Multiaddr @@ -40,30 +42,24 @@ func init() { } } -// BootstrapConfig specifies parameters used bootstrapping the DHT. -// -// Note there is a tradeoff between the bootstrap period and the -// number of queries. We could support a higher period with less -// queries. +// BootstrapConfig specifies parameters used for bootstrapping the DHT. type BootstrapConfig struct { - Queries int // how many queries to run per period - Period time.Duration // how often to run periodic bootstrap. - Timeout time.Duration // how long to wait for a bootstrap query to run + BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it + Timeout time.Duration // how long to wait for a bootstrap query to run + RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period + SelfQueryInterval time.Duration // how often to query for self } var DefaultBootstrapConfig = BootstrapConfig{ - // For now, this is set to 1 query. - // We are currently more interested in ensuring we have a properly formed - // DHT than making sure our dht minimizes traffic. Once we are more certain - // of our implementation's robustness, we should lower this down to 8 or 4. - Queries: 1, + // same as that mentioned in the kad dht paper + BucketPeriod: 1 * time.Hour, - // For now, this is set to 5 minutes, which is a medium period. We are - // We are currently more interested in ensuring we have a properly formed - // DHT than making sure our dht minimizes traffic. - Period: time.Duration(5 * time.Minute), + // since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable + RoutingTableScanInterval: 30 * time.Minute, - Timeout: time.Duration(10 * time.Second), + Timeout: 10 * time.Second, + + SelfQueryInterval: 1 * time.Hour, } // A method in the IpfsRouting interface. It calls BootstrapWithConfig with @@ -72,13 +68,24 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig) } -// Runs cfg.Queries bootstrap queries every cfg.Period. +// Runs cfg.Queries bootstrap queries every cfg.BucketPeriod. func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error { - // Because this method is not synchronous, we have to duplicate sanity - // checks on the config so that callers aren't oblivious. - if cfg.Queries <= 0 { - return fmt.Errorf("invalid number of queries: %d", cfg.Queries) - } + // we should query for self periodically so we can discover closer peers + go func() { + for { + err := dht.BootstrapSelf(ctx) + if err != nil { + logger.Warningf("error bootstrapping while querying for self: %s", err) + } + select { + case <-time.After(cfg.SelfQueryInterval): + case <-ctx.Done(): + return + } + } + }() + + // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period go func() { for { err := dht.runBootstrap(ctx, cfg) @@ -86,7 +93,7 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig logger.Warningf("error bootstrapping: %s", err) } select { - case <-time.After(cfg.Period): + case <-time.After(cfg.RoutingTableScanInterval): case <-ctx.Done(): return } @@ -95,15 +102,6 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig return nil } -// This is a synchronous bootstrap. cfg.Queries queries will run each with a -// timeout of cfg.Timeout. cfg.Period is not used. -func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error { - if cfg.Queries <= 0 { - return fmt.Errorf("invalid number of queries: %d", cfg.Queries) - } - return dht.runBootstrap(ctx, cfg) -} - func newRandomPeerId() peer.ID { id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs. rand.Read(id) @@ -145,14 +143,14 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error { return err } -// runBootstrap builds up list of peers by requesting random peer IDs +//scan the RT,& do a random walk on k-buckets that haven't been queried since the given bucket period func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { doQuery := func(n int, target string, f func(context.Context) error) error { - logger.Infof("starting bootstrap query (%d/%d) to %s (routing table size was %d)", - n, cfg.Queries, target, dht.routingTable.Size()) + logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)", + n, target, dht.routingTable.Size()) defer func() { - logger.Infof("finished bootstrap query (%d/%d) to %s (routing table size is now %d)", - n, cfg.Queries, target, dht.routingTable.Size()) + logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)", + n, target, dht.routingTable.Size()) }() queryCtx, cancel := context.WithTimeout(ctx, cfg.Timeout) defer cancel() @@ -163,16 +161,63 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error return err } - // Do all but one of the bootstrap queries as random walks. - for i := 0; i < cfg.Queries; i++ { - err := doQuery(i, "random ID", dht.randomWalk) - if err != nil { - return err + buckets := dht.routingTable.GetAllBuckets() + var wg sync.WaitGroup + errChan := make(chan error) + + for bucketID, bucket := range buckets { + if time.Since(bucket.LastQueriedAt()) > cfg.BucketPeriod { + wg.Add(1) + go func(bucketID int, errChan chan<- error) { + defer wg.Done() + // gen rand peer in the bucket + randPeerInBucket, err := dht.routingTable.GenRandPeerID(bucketID) + if err != nil { + errChan <- errors.Wrapf(err, "failed to generate random peer ID in bucket %d", bucketID) + return + } + + // walk to the generated peer + walkFnc := func(c context.Context) error { + _, err := dht.walk(ctx, randPeerInBucket) + if err == routing.ErrNotFound { + return nil + } + return err + } + + if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil { + errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID) + } + }(bucketID, errChan) } } - // Find self to distribute peer info to our neighbors. - return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), dht.selfWalk) + // wait for all walks to finish & close the error channel + go func() { + wg.Wait() + close(errChan) + }() + + // accumulate errors from all go-routines + var errStrings []string + for err := range errChan { + errStrings = append(errStrings, err.Error()) + } + if len(errStrings) == 0 { + return nil + } else { + return fmt.Errorf("errors encountered while running bootstrap on RT: %s", strings.Join(errStrings, "\n")) + } +} + +// This is a synchronous bootstrap. +func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error { + if err := dht.BootstrapSelf(ctx); err != nil { + return errors.Wrap(err, "failed bootstrap while searching for self") + } else { + return dht.runBootstrap(ctx, cfg) + } } func (dht *IpfsDHT) BootstrapRandom(ctx context.Context) error { diff --git a/go.mod b/go.mod index 55b7713..fa6cda3 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr-dns v0.0.3 github.com/multiformats/go-multistream v0.1.0 + github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.3.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc go.opencensus.io v0.22.1 From f7353aac3b9cd76b16cb7822bb9d40dba763bfb6 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 19 Aug 2019 09:00:49 +0800 Subject: [PATCH 2/9] reset timer on bucket --- dht.go | 18 +++++++++++------- dht_bootstrap.go | 2 +- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/dht.go b/dht.go index 66e0508..7032055 100644 --- a/dht.go +++ b/dht.go @@ -21,18 +21,18 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/metrics" opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - providers "github.com/libp2p/go-libp2p-kad-dht/providers" + "github.com/libp2p/go-libp2p-kad-dht/providers" - proto "github.com/gogo/protobuf/proto" - cid "github.com/ipfs/go-cid" + "github.com/gogo/protobuf/proto" + "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - goprocess "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" + "github.com/jbenet/goprocess" + "github.com/jbenet/goprocess/context" kb "github.com/libp2p/go-libp2p-kbucket" - record "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" - base32 "github.com/whyrusleeping/base32" + "github.com/whyrusleeping/base32" ) var logger = logging.Logger("dht") @@ -307,6 +307,10 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( resp, err := dht.sendRequest(ctx, p, pmes) switch err { case nil: + // reset the timer for the k-bucket we just searched in ONLY if there was no error + // so that we can retry during the next bootstrap + bucket := dht.routingTable.BucketForPeer(id) + bucket.ResetLastQueriedAt(time.Now()) return resp, nil case ErrReadTimeout: logger.Warningf("read timeout: %s %s", p.Pretty(), id) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 325caa5..dee52b7 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -75,7 +75,7 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig for { err := dht.BootstrapSelf(ctx) if err != nil { - logger.Warningf("error bootstrapping while querying for self: %s", err) + logger.Warningf("error bootstrapping while searching for my self (I'm Too Shallow ?): %s", err) } select { case <-time.After(cfg.SelfQueryInterval): From 5329454a0f4155724ef6c9f07172e76f89dbbe10 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 19 Aug 2019 09:17:43 +0800 Subject: [PATCH 3/9] removed cfg.Queries param from bootstrap --- dht_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dht_test.go b/dht_test.go index 681f90c..0e27ec2 100644 --- a/dht_test.go +++ b/dht_test.go @@ -198,12 +198,11 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { // probably because results compound cfg := DefaultBootstrapConfig - cfg.Queries = 3 start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.runBootstrap(ctx, cfg) + dht.BootstrapOnce(ctx, cfg) } } @@ -712,7 +711,6 @@ func TestPeriodicBootstrap(t *testing.T) { }() cfg := DefaultBootstrapConfig - cfg.Queries = 5 t.Logf("dhts are not connected. %d", nDHTs) for _, dht := range dhts { From 585d6725bef385d3d76f4635622d0a69e8ad10a3 Mon Sep 17 00:00:00 2001 From: aarshkshah1992 Date: Mon, 26 Aug 2019 13:57:32 +0800 Subject: [PATCH 4/9] refresh a bucket whenever we lookup a key in it --- dht.go | 4 ---- routing.go | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/dht.go b/dht.go index 7032055..c28294c 100644 --- a/dht.go +++ b/dht.go @@ -307,10 +307,6 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( resp, err := dht.sendRequest(ctx, p, pmes) switch err { case nil: - // reset the timer for the k-bucket we just searched in ONLY if there was no error - // so that we can retry during the next bootstrap - bucket := dht.routingTable.BucketForPeer(id) - bucket.ResetLastQueriedAt(time.Now()) return resp, nil case ErrReadTimeout: logger.Warningf("read timeout: %s %s", p.Pretty(), id) diff --git a/routing.go b/routing.go index 859e966..06dd9eb 100644 --- a/routing.go +++ b/routing.go @@ -13,7 +13,7 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" pb "github.com/libp2p/go-libp2p-kad-dht/pb" @@ -74,6 +74,11 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts return err } + // refresh the k-bucket containing this key + defer func() { + dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) + }() + pchan, err := dht.GetClosestPeers(ctx, key) if err != nil { return err @@ -98,6 +103,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts }(p) } wg.Wait() + return nil } @@ -157,6 +163,11 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing responsesNeeded = getQuorum(&cfg, -1) } + // refresh the k-bucket containing this key + defer func() { + dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) + }() + valCh, err := dht.getValues(ctx, key, responsesNeeded) if err != nil { return nil, err @@ -254,6 +265,11 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R eip.Append(loggableKey(key)) defer eip.Done() + // refresh the k-bucket containing this key + defer func() { + dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) + }() + valCh, err := dht.getValues(ctx, key, nvals) if err != nil { eip.SetError(err) @@ -429,6 +445,11 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err defer cancel() } + // refresh the k-bucket containing this key + defer func() { + dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) + }() + peers, err := dht.GetClosestPeers(closerCtx, key.KeyString()) if err != nil { return err @@ -486,6 +507,12 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { logger.Event(ctx, "findProviders", key) peerOut := make(chan peer.AddrInfo, count) + + // refresh the k-bucket containing this key + defer func() { + dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) + }() + go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) return peerOut } @@ -621,6 +648,11 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, } } + // refresh the k-bucket containing this key + defer func() { + dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) + }() + // setup the Query parent := ctx query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { @@ -682,6 +714,11 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< return nil, kb.ErrLookupFailure } + // refresh the k-bucket containing this key + defer func() { + dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) + }() + // setup the Query query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { From 14545294e37ec3956830d93e2ce4bd7c290a6e6b Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 29 Aug 2019 16:08:58 -0700 Subject: [PATCH 5/9] dep: update go-libp2p-kbucket --- dht_bootstrap.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/dht_bootstrap.go b/dht_bootstrap.go index dee52b7..5a596bf 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -166,16 +166,12 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error errChan := make(chan error) for bucketID, bucket := range buckets { - if time.Since(bucket.LastQueriedAt()) > cfg.BucketPeriod { + if time.Since(bucket.RefreshedAt()) > cfg.BucketPeriod { wg.Add(1) go func(bucketID int, errChan chan<- error) { defer wg.Done() // gen rand peer in the bucket - randPeerInBucket, err := dht.routingTable.GenRandPeerID(bucketID) - if err != nil { - errChan <- errors.Wrapf(err, "failed to generate random peer ID in bucket %d", bucketID) - return - } + randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID) // walk to the generated peer walkFnc := func(c context.Context) error { From 232357eab21581416f6c8504864a11efa9b654fa Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Thu, 29 Aug 2019 16:10:02 -0700 Subject: [PATCH 6/9] chore: smaller diff --- dht.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/dht.go b/dht.go index c28294c..66e0508 100644 --- a/dht.go +++ b/dht.go @@ -21,18 +21,18 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/metrics" opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - "github.com/libp2p/go-libp2p-kad-dht/providers" + providers "github.com/libp2p/go-libp2p-kad-dht/providers" - "github.com/gogo/protobuf/proto" - "github.com/ipfs/go-cid" + proto "github.com/gogo/protobuf/proto" + cid "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - "github.com/jbenet/goprocess" - "github.com/jbenet/goprocess/context" + goprocess "github.com/jbenet/goprocess" + goprocessctx "github.com/jbenet/goprocess/context" kb "github.com/libp2p/go-libp2p-kbucket" - "github.com/libp2p/go-libp2p-record" + record "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" - "github.com/whyrusleeping/base32" + base32 "github.com/whyrusleeping/base32" ) var logger = logging.Logger("dht") From 5da16344fe4c4e1176d825aaa03132813fe84b22 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Fri, 30 Aug 2019 16:18:50 +0800 Subject: [PATCH 7/9] refresh bucket only when query is successful --- lookup.go | 5 ++++- routing.go | 47 ++++++++++++----------------------------------- 2 files changed, 16 insertions(+), 36 deletions(-) diff --git a/lookup.go b/lookup.go index 41ef735..a00fbfb 100644 --- a/lookup.go +++ b/lookup.go @@ -8,7 +8,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" @@ -103,6 +103,9 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee } if res != nil && res.queriedSet != nil { + // refresh the k-bucket containing this key as the query was successful + dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) + sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key)) l := len(sorted) if l > dht.bucketSize { diff --git a/routing.go b/routing.go index 06dd9eb..1cfe293 100644 --- a/routing.go +++ b/routing.go @@ -74,11 +74,6 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts return err } - // refresh the k-bucket containing this key - defer func() { - dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) - }() - pchan, err := dht.GetClosestPeers(ctx, key) if err != nil { return err @@ -163,11 +158,6 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing responsesNeeded = getQuorum(&cfg, -1) } - // refresh the k-bucket containing this key - defer func() { - dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) - }() - valCh, err := dht.getValues(ctx, key, responsesNeeded) if err != nil { return nil, err @@ -265,11 +255,6 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R eip.Append(loggableKey(key)) defer eip.Done() - // refresh the k-bucket containing this key - defer func() { - dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) - }() - valCh, err := dht.getValues(ctx, key, nvals) if err != nil { eip.SetError(err) @@ -396,6 +381,9 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha // // We'll just call this a success. if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) { + // refresh the k-bucket containing this key as the query was successful + dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) + err = nil } done(err) @@ -445,11 +433,6 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key cid.Cid, brdcst bool) (err defer cancel() } - // refresh the k-bucket containing this key - defer func() { - dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) - }() - peers, err := dht.GetClosestPeers(closerCtx, key.KeyString()) if err != nil { return err @@ -508,11 +491,6 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count i logger.Event(ctx, "findProviders", key) peerOut := make(chan peer.AddrInfo, count) - // refresh the k-bucket containing this key - defer func() { - dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) - }() - go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) return peerOut } @@ -618,6 +596,9 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, Extra: err.Error(), }) } + + // refresh the k-bucket containing this key after the query is run + dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) } // FindPeer searches for a peer with given ID. @@ -648,11 +629,6 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, } } - // refresh the k-bucket containing this key - defer func() { - dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) - }() - // setup the Query parent := ctx query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { @@ -694,6 +670,9 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return peer.AddrInfo{}, err } + // refresh the k-bucket containing this key since the lookup was successful + dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) + logger.Debugf("FindPeer %v %v", id, result.success) if result.peer.ID == "" { return peer.AddrInfo{}, routing.ErrNotFound @@ -714,11 +693,6 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< return nil, kb.ErrLookupFailure } - // refresh the k-bucket containing this key - defer func() { - dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) - }() - // setup the Query query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { @@ -767,6 +741,9 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< logger.Debug(err) } + // refresh the k-bucket containing this key + dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) + // close the peerchan channel when done. close(peerchan) }() From f4630f62d5e9ff42724da44bd51bd086fa35abe6 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Sun, 1 Sep 2019 22:10:40 +0800 Subject: [PATCH 8/9] 1) seed RT whenever it becomes empty 2) seed RT if empty before starting bootstrap incase 1 hasn't fired 3) pass bootstrap config as option while creating Dht 4) replace all bootstrap function with 1 function --- dht.go | 99 ++++++++++++++++++++++++++++++------ dht_bootstrap.go | 129 +++++++++++++++-------------------------------- dht_test.go | 8 +-- go.mod | 1 + opts/options.go | 45 ++++++++++++++--- 5 files changed, 165 insertions(+), 117 deletions(-) diff --git a/dht.go b/dht.go index 66e0508..973b7de 100644 --- a/dht.go +++ b/dht.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/google/uuid" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -37,10 +38,6 @@ import ( var logger = logging.Logger("dht") -// NumBootstrapQueries defines the number of random dht queries to do to -// collect members of the routing table. -const NumBootstrapQueries = 5 - // IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base Routing module. type IpfsDHT struct { @@ -70,6 +67,10 @@ type IpfsDHT struct { protocols []protocol.ID // DHT protocols bucketSize int + + bootstrapCfg opts.BootstrapConfig + + rtRecoveryChan chan *rtRecoveryReq } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -82,6 +83,15 @@ var ( _ routing.ValueStore = (*IpfsDHT)(nil) ) +type rtRecoveryReq struct { + id string + errorChan chan error +} + +func mkRtRecoveryReq() *rtRecoveryReq { + return &rtRecoveryReq{uuid.New().String(), make(chan error, 1)} +} + // New creates a new DHT with the specified host and options. func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) { var cfg opts.Options @@ -90,6 +100,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er return nil, err } dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize) + dht.bootstrapCfg = cfg.BootstrapConfig // register for network notifs. dht.host.Network().Notify((*netNotifiee)(dht)) @@ -103,6 +114,11 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er dht.proc.AddChild(dht.providers.Process()) dht.Validator = cfg.Validator + // RT recovery proc + rtRecoveryProc := goprocessctx.WithContext(ctx) + rtRecoveryProc.Go(dht.rtRecovery) + dht.proc.AddChild(rtRecoveryProc) + if !cfg.Client { for _, p := range cfg.Protocols { h.SetStreamHandler(p, dht.handleNewStream) @@ -136,27 +152,48 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT { rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()) + rtRecoveryChan := make(chan *rtRecoveryReq) cmgr := h.ConnManager() + rt.PeerAdded = func(p peer.ID) { cmgr.TagPeer(p, "kbucket", 5) } + rt.PeerRemoved = func(p peer.ID) { cmgr.UntagPeer(p, "kbucket") + go func(rtRecoveryChan chan *rtRecoveryReq) { + if rt.Size() == 0 { + req := mkRtRecoveryReq() + logger.Warningf("rt peer removed notification: RT is empty, will attempt to initiate recovery, reqID=%s", req.id) + select { + case <-ctx.Done(): + return + case rtRecoveryChan <- req: + select { + case <-ctx.Done(): + return + case <-req.errorChan: + // TODO Do we need to do anything here ? + } + } + } + }(rtRecoveryChan) } dht := &IpfsDHT{ - datastore: dstore, - self: h.ID(), - peerstore: h.Peerstore(), - host: h, - strmap: make(map[peer.ID]*messageSender), - ctx: ctx, - providers: providers.NewProviderManager(ctx, h.ID(), dstore), - birth: time.Now(), - routingTable: rt, - protocols: protocols, - bucketSize: bucketSize, + datastore: dstore, + self: h.ID(), + peerstore: h.Peerstore(), + host: h, + strmap: make(map[peer.ID]*messageSender), + ctx: ctx, + providers: providers.NewProviderManager(ctx, h.ID(), dstore), + birth: time.Now(), + routingTable: rt, + protocols: protocols, + bucketSize: bucketSize, + rtRecoveryChan: rtRecoveryChan, } dht.ctx = dht.newContextWithLocalTags(ctx) @@ -164,6 +201,38 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p return dht } +func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { + writeResp := func(errorChan chan error, err error) { + select { + case <-proc.Closing(): + case errorChan <- err: + } + close(errorChan) + } + + for { + select { + case req := <-dht.rtRecoveryChan: + if dht.routingTable.Size() == 0 { + logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id) + // TODO Call Seeder with default bootstrap peers here once #383 is merged + if dht.routingTable.Size() > 0 { + logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size()) + go writeResp(req.errorChan, nil) + } else { + logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id) + go writeResp(req.errorChan, errors.New("RT empty after seed attempt")) + } + } else { + logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id) + go writeResp(req.errorChan, nil) + } + case <-proc.Closing(): + return + } + } +} + // putValueToPeer stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 5a596bf..5ecfa86 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -2,14 +2,11 @@ package dht import ( "context" - "crypto/rand" "fmt" "strings" "sync" "time" - u "github.com/ipfs/go-ipfs-util" - "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" @@ -42,43 +39,36 @@ func init() { } } -// BootstrapConfig specifies parameters used for bootstrapping the DHT. -type BootstrapConfig struct { - BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it - Timeout time.Duration // how long to wait for a bootstrap query to run - RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period - SelfQueryInterval time.Duration // how often to query for self -} - -var DefaultBootstrapConfig = BootstrapConfig{ - // same as that mentioned in the kad dht paper - BucketPeriod: 1 * time.Hour, - - // since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable - RoutingTableScanInterval: 30 * time.Minute, - - Timeout: 10 * time.Second, - - SelfQueryInterval: 1 * time.Hour, -} - -// A method in the IpfsRouting interface. It calls BootstrapWithConfig with -// the default bootstrap config. -func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { - return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig) -} - // Runs cfg.Queries bootstrap queries every cfg.BucketPeriod. -func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error { +func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { + seedRTIfEmpty := func(tag string) { + if dht.routingTable.Size() == 0 { + req := mkRtRecoveryReq() + logger.Warningf("dht bootstrap: %s: RT is empty, will attempt to initiate recovery, reqID=%s", tag, req.id) + select { + case <-ctx.Done(): + return + case dht.rtRecoveryChan <- req: + select { + case <-ctx.Done(): + return + case <-req.errorChan: + // TODO Should we abort the ONGOING bootstrap attempt if seeder returns an error on the channel ? + } + } + } + } + // we should query for self periodically so we can discover closer peers go func() { for { - err := dht.BootstrapSelf(ctx) + seedRTIfEmpty("self walk") + err := dht.selfWalk(ctx) if err != nil { logger.Warningf("error bootstrapping while searching for my self (I'm Too Shallow ?): %s", err) } select { - case <-time.After(cfg.SelfQueryInterval): + case <-time.After(dht.bootstrapCfg.SelfQueryInterval): case <-ctx.Done(): return } @@ -88,12 +78,13 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period go func() { for { - err := dht.runBootstrap(ctx, cfg) + seedRTIfEmpty("buckets") + err := dht.bootstrapBuckets(ctx) if err != nil { logger.Warningf("error bootstrapping: %s", err) } select { - case <-time.After(cfg.RoutingTableScanInterval): + case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval): case <-ctx.Done(): return } @@ -102,49 +93,8 @@ func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig return nil } -func newRandomPeerId() peer.ID { - id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs. - rand.Read(id) - id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it. - return peer.ID(id) -} - -// Traverse the DHT toward the given ID. -func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (peer.AddrInfo, error) { - // TODO: Extract the query action (traversal logic?) inside FindPeer, - // don't actually call through the FindPeer machinery, which can return - // things out of the peer store etc. - return dht.FindPeer(ctx, target) -} - -// Traverse the DHT toward a random ID. -func (dht *IpfsDHT) randomWalk(ctx context.Context) error { - id := newRandomPeerId() - p, err := dht.walk(ctx, id) - switch err { - case routing.ErrNotFound: - return nil - case nil: - // We found a peer from a randomly generated ID. This should be very - // unlikely. - logger.Warningf("random walk toward %s actually found peer: %s", id, p) - return nil - default: - return err - } -} - -// Traverse the DHT toward the self ID -func (dht *IpfsDHT) selfWalk(ctx context.Context) error { - _, err := dht.walk(ctx, dht.self) - if err == routing.ErrNotFound { - return nil - } - return err -} - //scan the RT,& do a random walk on k-buckets that haven't been queried since the given bucket period -func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { +func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { doQuery := func(n int, target string, f func(context.Context) error) error { logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)", n, target, dht.routingTable.Size()) @@ -152,7 +102,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)", n, target, dht.routingTable.Size()) }() - queryCtx, cancel := context.WithTimeout(ctx, cfg.Timeout) + queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) defer cancel() err := f(queryCtx) if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil { @@ -166,7 +116,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error errChan := make(chan error) for bucketID, bucket := range buckets { - if time.Since(bucket.RefreshedAt()) > cfg.BucketPeriod { + if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod { wg.Add(1) go func(bucketID int, errChan chan<- error) { defer wg.Done() @@ -175,7 +125,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error // walk to the generated peer walkFnc := func(c context.Context) error { - _, err := dht.walk(ctx, randPeerInBucket) + _, err := dht.FindPeer(ctx, randPeerInBucket) if err == routing.ErrNotFound { return nil } @@ -207,19 +157,20 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error } } -// This is a synchronous bootstrap. -func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error { - if err := dht.BootstrapSelf(ctx); err != nil { +// synchronous bootstrap. +func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error { + if err := dht.selfWalk(ctx); err != nil { return errors.Wrap(err, "failed bootstrap while searching for self") } else { - return dht.runBootstrap(ctx, cfg) + return dht.bootstrapBuckets(ctx) } } -func (dht *IpfsDHT) BootstrapRandom(ctx context.Context) error { - return dht.randomWalk(ctx) -} - -func (dht *IpfsDHT) BootstrapSelf(ctx context.Context) error { - return dht.selfWalk(ctx) +// Traverse the DHT toward the self ID +func (dht *IpfsDHT) selfWalk(ctx context.Context) error { + _, err := dht.FindPeer(ctx, dht.self) + if err == routing.ErrNotFound { + return nil + } + return err } diff --git a/dht_test.go b/dht_test.go index 0e27ec2..18dc04a 100644 --- a/dht_test.go +++ b/dht_test.go @@ -197,12 +197,10 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { // 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd // probably because results compound - cfg := DefaultBootstrapConfig - start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.BootstrapOnce(ctx, cfg) + dht.bootstrapOnce(ctx) } } @@ -710,8 +708,6 @@ func TestPeriodicBootstrap(t *testing.T) { } }() - cfg := DefaultBootstrapConfig - t.Logf("dhts are not connected. %d", nDHTs) for _, dht := range dhts { rtlen := dht.routingTable.Size() @@ -738,7 +734,7 @@ func TestPeriodicBootstrap(t *testing.T) { t.Logf("bootstrapping them so they find each other. %d", nDHTs) for _, dht := range dhts { - go dht.BootstrapOnce(ctx, cfg) + go dht.bootstrapOnce(ctx) } // this is async, and we dont know when it's finished with one cycle, so keep checking diff --git a/go.mod b/go.mod index fa6cda3..bf24e9f 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.12 require ( github.com/gogo/protobuf v1.3.0 + github.com/google/uuid v1.1.1 github.com/hashicorp/golang-lru v0.5.3 github.com/ipfs/go-cid v0.0.3 github.com/ipfs/go-datastore v0.1.0 diff --git a/opts/options.go b/opts/options.go index 5840d39..2fbb91e 100644 --- a/opts/options.go +++ b/opts/options.go @@ -2,11 +2,12 @@ package dhtopts import ( "fmt" + "time" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" - protocol "github.com/libp2p/go-libp2p-core/protocol" - record "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p-record" ) // Deprecated: The old format did not support more than one message per stream, and is not supported @@ -18,13 +19,22 @@ var ( DefaultProtocols = []protocol.ID{ProtocolDHT} ) +// BootstrapConfig specifies parameters used for bootstrapping the DHT. +type BootstrapConfig struct { + BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it + Timeout time.Duration // how long to wait for a bootstrap query to run + RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period + SelfQueryInterval time.Duration // how often to query for self +} + // Options is a structure containing all the options that can be used when constructing a DHT. type Options struct { - Datastore ds.Batching - Validator record.Validator - Client bool - Protocols []protocol.ID - BucketSize int + Datastore ds.Batching + Validator record.Validator + Client bool + Protocols []protocol.ID + BucketSize int + BootstrapConfig BootstrapConfig } // Apply applies the given options to this Option @@ -48,9 +58,30 @@ var Defaults = func(o *Options) error { } o.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) o.Protocols = DefaultProtocols + + o.BootstrapConfig = BootstrapConfig{ + // same as that mentioned in the kad dht paper + BucketPeriod: 1 * time.Hour, + + // since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable + RoutingTableScanInterval: 30 * time.Minute, + + Timeout: 10 * time.Second, + + SelfQueryInterval: 1 * time.Hour, + } + return nil } +// Bootstrap configures the dht bootstrapping process +func Bootstrap(b BootstrapConfig) Option { + return func(o *Options) error { + o.BootstrapConfig = b + return nil + } +} + // Datastore configures the DHT to use the specified datastore. // // Defaults to an in-memory (temporary) map. From 00fffba0aa6948e7549752197f63b85f293a66e9 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Thu, 5 Sep 2019 22:09:16 +0800 Subject: [PATCH 9/9] Update dht_bootstrap.go 1) on connecting to a new peer -> trigger self & bucket bootstrap if RT size goes below thereshold 2) accept formatting & doc suggestions in the review 3) remove RT recovery code for now -> will address in a separate PR once #383 goes in changes as per review --- dht.go | 81 +++++++++++++++--------------------------------- dht_bootstrap.go | 70 ++++++++++++++++++++--------------------- dht_test.go | 49 +++++++++++++++++++++++++++++ notif.go | 14 +++++++++ 4 files changed, 123 insertions(+), 91 deletions(-) diff --git a/dht.go b/dht.go index 973b7de..471ae4e 100644 --- a/dht.go +++ b/dht.go @@ -8,7 +8,6 @@ import ( "sync" "time" - "github.com/google/uuid" "github.com/libp2p/go-libp2p-core/host" "github.com/libp2p/go-libp2p-core/network" "github.com/libp2p/go-libp2p-core/peer" @@ -22,18 +21,18 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/metrics" opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - providers "github.com/libp2p/go-libp2p-kad-dht/providers" + "github.com/libp2p/go-libp2p-kad-dht/providers" - proto "github.com/gogo/protobuf/proto" - cid "github.com/ipfs/go-cid" + "github.com/gogo/protobuf/proto" + "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - goprocess "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" + "github.com/jbenet/goprocess" + "github.com/jbenet/goprocess/context" kb "github.com/libp2p/go-libp2p-kbucket" - record "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" - base32 "github.com/whyrusleeping/base32" + "github.com/whyrusleeping/base32" ) var logger = logging.Logger("dht") @@ -70,7 +69,7 @@ type IpfsDHT struct { bootstrapCfg opts.BootstrapConfig - rtRecoveryChan chan *rtRecoveryReq + triggerBootstrap chan struct{} } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -83,15 +82,6 @@ var ( _ routing.ValueStore = (*IpfsDHT)(nil) ) -type rtRecoveryReq struct { - id string - errorChan chan error -} - -func mkRtRecoveryReq() *rtRecoveryReq { - return &rtRecoveryReq{uuid.New().String(), make(chan error, 1)} -} - // New creates a new DHT with the specified host and options. func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) { var cfg opts.Options @@ -114,11 +104,6 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er dht.proc.AddChild(dht.providers.Process()) dht.Validator = cfg.Validator - // RT recovery proc - rtRecoveryProc := goprocessctx.WithContext(ctx) - rtRecoveryProc.Go(dht.rtRecovery) - dht.proc.AddChild(rtRecoveryProc) - if !cfg.Client { for _, p := range cfg.Protocols { h.SetStreamHandler(p, dht.handleNewStream) @@ -152,8 +137,6 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT { rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()) - rtRecoveryChan := make(chan *rtRecoveryReq) - cmgr := h.ConnManager() rt.PeerAdded = func(p peer.ID) { @@ -162,38 +145,21 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p rt.PeerRemoved = func(p peer.ID) { cmgr.UntagPeer(p, "kbucket") - go func(rtRecoveryChan chan *rtRecoveryReq) { - if rt.Size() == 0 { - req := mkRtRecoveryReq() - logger.Warningf("rt peer removed notification: RT is empty, will attempt to initiate recovery, reqID=%s", req.id) - select { - case <-ctx.Done(): - return - case rtRecoveryChan <- req: - select { - case <-ctx.Done(): - return - case <-req.errorChan: - // TODO Do we need to do anything here ? - } - } - } - }(rtRecoveryChan) } dht := &IpfsDHT{ - datastore: dstore, - self: h.ID(), - peerstore: h.Peerstore(), - host: h, - strmap: make(map[peer.ID]*messageSender), - ctx: ctx, - providers: providers.NewProviderManager(ctx, h.ID(), dstore), - birth: time.Now(), - routingTable: rt, - protocols: protocols, - bucketSize: bucketSize, - rtRecoveryChan: rtRecoveryChan, + datastore: dstore, + self: h.ID(), + peerstore: h.Peerstore(), + host: h, + strmap: make(map[peer.ID]*messageSender), + ctx: ctx, + providers: providers.NewProviderManager(ctx, h.ID(), dstore), + birth: time.Now(), + routingTable: rt, + protocols: protocols, + bucketSize: bucketSize, + triggerBootstrap: make(chan struct{}), } dht.ctx = dht.newContextWithLocalTags(ctx) @@ -201,7 +167,10 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p return dht } -func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { +// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR +// come up with an alternative solution. +// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387 +/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { writeResp := func(errorChan chan error, err error) { select { case <-proc.Closing(): @@ -231,7 +200,7 @@ func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { return } } -} +}*/ // putValueToPeer stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 5ecfa86..6e40c59 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -15,6 +15,8 @@ import ( var DefaultBootstrapPeers []multiaddr.Multiaddr +var minRTBootstrapThreshold = 4 + func init() { for _, s := range []string{ "/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", @@ -39,33 +41,27 @@ func init() { } } -// Runs cfg.Queries bootstrap queries every cfg.BucketPeriod. +// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod. func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { - seedRTIfEmpty := func(tag string) { - if dht.routingTable.Size() == 0 { - req := mkRtRecoveryReq() - logger.Warningf("dht bootstrap: %s: RT is empty, will attempt to initiate recovery, reqID=%s", tag, req.id) - select { - case <-ctx.Done(): - return - case dht.rtRecoveryChan <- req: - select { - case <-ctx.Done(): - return - case <-req.errorChan: - // TODO Should we abort the ONGOING bootstrap attempt if seeder returns an error on the channel ? - } - } + triggerBootstrapFnc := func() { + logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap", + dht.routingTable.Size(), minRTBootstrapThreshold) + + if err := dht.selfWalk(ctx); err != nil { + logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err) + } + + if err := dht.bootstrapBuckets(ctx); err != nil { + logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err) } } // we should query for self periodically so we can discover closer peers go func() { for { - seedRTIfEmpty("self walk") err := dht.selfWalk(ctx) if err != nil { - logger.Warningf("error bootstrapping while searching for my self (I'm Too Shallow ?): %s", err) + logger.Warningf("self walk: error: %s", err) } select { case <-time.After(dht.bootstrapCfg.SelfQueryInterval): @@ -78,29 +74,31 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period go func() { for { - seedRTIfEmpty("buckets") err := dht.bootstrapBuckets(ctx) if err != nil { - logger.Warningf("error bootstrapping: %s", err) + logger.Warningf("bootstrap buckets: error bootstrapping: %s", err) } select { case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval): + case <-dht.triggerBootstrap: + triggerBootstrapFnc() case <-ctx.Done(): return } } }() + return nil } -//scan the RT,& do a random walk on k-buckets that haven't been queried since the given bucket period +// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { - doQuery := func(n int, target string, f func(context.Context) error) error { + doQuery := func(bucketId int, target string, f func(context.Context) error) error { logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)", - n, target, dht.routingTable.Size()) + bucketId, target, dht.routingTable.Size()) defer func() { logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)", - n, target, dht.routingTable.Size()) + bucketId, target, dht.routingTable.Size()) }() queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) defer cancel() @@ -145,7 +143,7 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { close(errChan) }() - // accumulate errors from all go-routines + // accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure. var errStrings []string for err := range errChan { errStrings = append(errStrings, err.Error()) @@ -153,10 +151,21 @@ func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { if len(errStrings) == 0 { return nil } else { - return fmt.Errorf("errors encountered while running bootstrap on RT: %s", strings.Join(errStrings, "\n")) + return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n")) } } +// Traverse the DHT toward the self ID +func (dht *IpfsDHT) selfWalk(ctx context.Context) error { + queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) + defer cancel() + _, err := dht.FindPeer(queryCtx, dht.self) + if err == routing.ErrNotFound { + return nil + } + return err +} + // synchronous bootstrap. func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error { if err := dht.selfWalk(ctx); err != nil { @@ -165,12 +174,3 @@ func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error { return dht.bootstrapBuckets(ctx) } } - -// Traverse the DHT toward the self ID -func (dht *IpfsDHT) selfWalk(ctx context.Context) error { - _, err := dht.FindPeer(ctx, dht.self) - if err == routing.ErrNotFound { - return nil - } - return err -} diff --git a/dht_test.go b/dht_test.go index 18dc04a..bba86c8 100644 --- a/dht_test.go +++ b/dht_test.go @@ -688,6 +688,55 @@ func TestBootstrap(t *testing.T) { } } +func TestBootstrapBelowMinRTThreshold(t *testing.T) { + ctx := context.Background() + dhtA := setupDHT(ctx, t, false) + dhtB := setupDHT(ctx, t, false) + dhtC := setupDHT(ctx, t, false) + + defer func() { + dhtA.Close() + dhtA.host.Close() + + dhtB.Close() + dhtB.host.Close() + + dhtC.Close() + dhtC.host.Close() + }() + + connect(t, ctx, dhtA, dhtB) + connect(t, ctx, dhtB, dhtC) + + // we ONLY init bootstrap on A + dhtA.Bootstrap(ctx) + // and wait for one round to complete i.e. A should be connected to both B & C + waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second) + + // now we create two new peers + dhtD := setupDHT(ctx, t, false) + dhtE := setupDHT(ctx, t, false) + + // connect them to each other + connect(t, ctx, dhtD, dhtE) + defer func() { + dhtD.Close() + dhtD.host.Close() + + dhtE.Close() + dhtE.host.Close() + }() + + // and then, on connecting the peer D to A, the min RT threshold gets triggered on A which leads to a bootstrap. + // since the default bootstrap scan interval is 30 mins - 1 hour, we can be sure that if bootstrap happens, + // it is because of the min RT threshold getting triggered (since default min value is 4 & we only have 2 peers in the RT when D gets connected) + connect(t, ctx, dhtA, dhtD) + + // and because of the above bootstrap, A also discovers E ! + waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second) + assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") +} + func TestPeriodicBootstrap(t *testing.T) { if ci.IsRunning() { t.Skip("skipping on CI. highly timing dependent") diff --git a/notif.go b/notif.go index 3af7584..556cadd 100644 --- a/notif.go +++ b/notif.go @@ -32,7 +32,14 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { + bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) + if bootstrap { + select { + case dht.triggerBootstrap <- struct{}{}: + default: + } + } } return } @@ -71,7 +78,14 @@ func (nn *netNotifiee) testConnection(v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { + bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) + if bootstrap { + select { + case dht.triggerBootstrap <- struct{}{}: + default: + } + } } }