diff --git a/dht.go b/dht.go index 66e0508..471ae4e 100644 --- a/dht.go +++ b/dht.go @@ -21,26 +21,22 @@ import ( "github.com/libp2p/go-libp2p-kad-dht/metrics" opts "github.com/libp2p/go-libp2p-kad-dht/opts" pb "github.com/libp2p/go-libp2p-kad-dht/pb" - providers "github.com/libp2p/go-libp2p-kad-dht/providers" + "github.com/libp2p/go-libp2p-kad-dht/providers" - proto "github.com/gogo/protobuf/proto" - cid "github.com/ipfs/go-cid" + "github.com/gogo/protobuf/proto" + "github.com/ipfs/go-cid" ds "github.com/ipfs/go-datastore" logging "github.com/ipfs/go-log" - goprocess "github.com/jbenet/goprocess" - goprocessctx "github.com/jbenet/goprocess/context" + "github.com/jbenet/goprocess" + "github.com/jbenet/goprocess/context" kb "github.com/libp2p/go-libp2p-kbucket" - record "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-record" recpb "github.com/libp2p/go-libp2p-record/pb" - base32 "github.com/whyrusleeping/base32" + "github.com/whyrusleeping/base32" ) var logger = logging.Logger("dht") -// NumBootstrapQueries defines the number of random dht queries to do to -// collect members of the routing table. -const NumBootstrapQueries = 5 - // IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. // It is used to implement the base Routing module. type IpfsDHT struct { @@ -70,6 +66,10 @@ type IpfsDHT struct { protocols []protocol.ID // DHT protocols bucketSize int + + bootstrapCfg opts.BootstrapConfig + + triggerBootstrap chan struct{} } // Assert that IPFS assumptions about interfaces aren't broken. These aren't a @@ -90,6 +90,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er return nil, err } dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize) + dht.bootstrapCfg = cfg.BootstrapConfig // register for network notifs. dht.host.Network().Notify((*netNotifiee)(dht)) @@ -136,27 +137,29 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT { rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()) - cmgr := h.ConnManager() + rt.PeerAdded = func(p peer.ID) { cmgr.TagPeer(p, "kbucket", 5) } + rt.PeerRemoved = func(p peer.ID) { cmgr.UntagPeer(p, "kbucket") } dht := &IpfsDHT{ - datastore: dstore, - self: h.ID(), - peerstore: h.Peerstore(), - host: h, - strmap: make(map[peer.ID]*messageSender), - ctx: ctx, - providers: providers.NewProviderManager(ctx, h.ID(), dstore), - birth: time.Now(), - routingTable: rt, - protocols: protocols, - bucketSize: bucketSize, + datastore: dstore, + self: h.ID(), + peerstore: h.Peerstore(), + host: h, + strmap: make(map[peer.ID]*messageSender), + ctx: ctx, + providers: providers.NewProviderManager(ctx, h.ID(), dstore), + birth: time.Now(), + routingTable: rt, + protocols: protocols, + bucketSize: bucketSize, + triggerBootstrap: make(chan struct{}), } dht.ctx = dht.newContextWithLocalTags(ctx) @@ -164,6 +167,41 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p return dht } +// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR +// come up with an alternative solution. +// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387 +/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { + writeResp := func(errorChan chan error, err error) { + select { + case <-proc.Closing(): + case errorChan <- err: + } + close(errorChan) + } + + for { + select { + case req := <-dht.rtRecoveryChan: + if dht.routingTable.Size() == 0 { + logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id) + // TODO Call Seeder with default bootstrap peers here once #383 is merged + if dht.routingTable.Size() > 0 { + logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size()) + go writeResp(req.errorChan, nil) + } else { + logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id) + go writeResp(req.errorChan, errors.New("RT empty after seed attempt")) + } + } else { + logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id) + go writeResp(req.errorChan, nil) + } + case <-proc.Closing(): + return + } + } +}*/ + // putValueToPeer stores the given key/value pair at the peer 'p' func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { diff --git a/dht_bootstrap.go b/dht_bootstrap.go index 0dd1884..6e40c59 100644 --- a/dht_bootstrap.go +++ b/dht_bootstrap.go @@ -2,20 +2,21 @@ package dht import ( "context" - "crypto/rand" "fmt" + "strings" + "sync" "time" - "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/routing" - - u "github.com/ipfs/go-ipfs-util" "github.com/multiformats/go-multiaddr" _ "github.com/multiformats/go-multiaddr-dns" + "github.com/pkg/errors" ) var DefaultBootstrapPeers []multiaddr.Multiaddr +var minRTBootstrapThreshold = 4 + func init() { for _, s := range []string{ "/dnsaddr/bootstrap.libp2p.io/ipfs/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", @@ -40,121 +41,66 @@ func init() { } } -// BootstrapConfig specifies parameters used bootstrapping the DHT. -// -// Note there is a tradeoff between the bootstrap period and the -// number of queries. We could support a higher period with less -// queries. -type BootstrapConfig struct { - Queries int // how many queries to run per period - Period time.Duration // how often to run periodic bootstrap. - Timeout time.Duration // how long to wait for a bootstrap query to run -} - -var DefaultBootstrapConfig = BootstrapConfig{ - // For now, this is set to 1 query. - // We are currently more interested in ensuring we have a properly formed - // DHT than making sure our dht minimizes traffic. Once we are more certain - // of our implementation's robustness, we should lower this down to 8 or 4. - Queries: 1, - - // For now, this is set to 5 minutes, which is a medium period. We are - // We are currently more interested in ensuring we have a properly formed - // DHT than making sure our dht minimizes traffic. - Period: time.Duration(5 * time.Minute), - - Timeout: time.Duration(10 * time.Second), -} - -// A method in the IpfsRouting interface. It calls BootstrapWithConfig with -// the default bootstrap config. +// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod. func (dht *IpfsDHT) Bootstrap(ctx context.Context) error { - return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig) -} + triggerBootstrapFnc := func() { + logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap", + dht.routingTable.Size(), minRTBootstrapThreshold) -// Runs cfg.Queries bootstrap queries every cfg.Period. -func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error { - // Because this method is not synchronous, we have to duplicate sanity - // checks on the config so that callers aren't oblivious. - if cfg.Queries <= 0 { - return fmt.Errorf("invalid number of queries: %d", cfg.Queries) + if err := dht.selfWalk(ctx); err != nil { + logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err) + } + + if err := dht.bootstrapBuckets(ctx); err != nil { + logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err) + } } + + // we should query for self periodically so we can discover closer peers go func() { for { - err := dht.runBootstrap(ctx, cfg) + err := dht.selfWalk(ctx) if err != nil { - logger.Warningf("error bootstrapping: %s", err) + logger.Warningf("self walk: error: %s", err) } select { - case <-time.After(cfg.Period): + case <-time.After(dht.bootstrapCfg.SelfQueryInterval): case <-ctx.Done(): return } } }() + + // scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period + go func() { + for { + err := dht.bootstrapBuckets(ctx) + if err != nil { + logger.Warningf("bootstrap buckets: error bootstrapping: %s", err) + } + select { + case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval): + case <-dht.triggerBootstrap: + triggerBootstrapFnc() + case <-ctx.Done(): + return + } + } + }() + return nil } -// This is a synchronous bootstrap. cfg.Queries queries will run each with a -// timeout of cfg.Timeout. cfg.Period is not used. -func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error { - if cfg.Queries <= 0 { - return fmt.Errorf("invalid number of queries: %d", cfg.Queries) - } - return dht.runBootstrap(ctx, cfg) -} - -func newRandomPeerId() peer.ID { - id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs. - rand.Read(id) - id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it. - return peer.ID(id) -} - -// Traverse the DHT toward the given ID. -func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (peer.AddrInfo, error) { - // TODO: Extract the query action (traversal logic?) inside FindPeer, - // don't actually call through the FindPeer machinery, which can return - // things out of the peer store etc. - return dht.FindPeer(ctx, target) -} - -// Traverse the DHT toward a random ID. -func (dht *IpfsDHT) randomWalk(ctx context.Context) error { - id := newRandomPeerId() - p, err := dht.walk(ctx, id) - switch err { - case routing.ErrNotFound: - return nil - case nil: - // We found a peer from a randomly generated ID. This should be very - // unlikely. - logger.Warningf("random walk toward %s actually found peer: %s", id, p) - return nil - default: - return err - } -} - -// Traverse the DHT toward the self ID -func (dht *IpfsDHT) selfWalk(ctx context.Context) error { - _, err := dht.walk(ctx, dht.self) - if err == routing.ErrNotFound { - return nil - } - return err -} - -// runBootstrap builds up list of peers by requesting random peer IDs -func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error { - doQuery := func(n int, target string, f func(context.Context) error) error { - logger.Infof("starting bootstrap query (%d/%d) to %s (routing table size was %d)", - n, cfg.Queries, target, dht.routingTable.Size()) +// bootstrapBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period +func (dht *IpfsDHT) bootstrapBuckets(ctx context.Context) error { + doQuery := func(bucketId int, target string, f func(context.Context) error) error { + logger.Infof("starting bootstrap query for bucket %d to %s (routing table size was %d)", + bucketId, target, dht.routingTable.Size()) defer func() { - logger.Infof("finished bootstrap query (%d/%d) to %s (routing table size is now %d)", - n, cfg.Queries, target, dht.routingTable.Size()) + logger.Infof("finished bootstrap query for bucket %d to %s (routing table size is now %d)", + bucketId, target, dht.routingTable.Size()) }() - queryCtx, cancel := context.WithTimeout(ctx, cfg.Timeout) + queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) defer cancel() err := f(queryCtx) if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil { @@ -163,22 +109,68 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error return err } - // Do all but one of the bootstrap queries as random walks. - for i := 0; i < cfg.Queries; i++ { - err := doQuery(i, "random ID", dht.randomWalk) - if err != nil { - return err + buckets := dht.routingTable.GetAllBuckets() + var wg sync.WaitGroup + errChan := make(chan error) + + for bucketID, bucket := range buckets { + if time.Since(bucket.RefreshedAt()) > dht.bootstrapCfg.BucketPeriod { + wg.Add(1) + go func(bucketID int, errChan chan<- error) { + defer wg.Done() + // gen rand peer in the bucket + randPeerInBucket := dht.routingTable.GenRandPeerID(bucketID) + + // walk to the generated peer + walkFnc := func(c context.Context) error { + _, err := dht.FindPeer(ctx, randPeerInBucket) + if err == routing.ErrNotFound { + return nil + } + return err + } + + if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil { + errChan <- errors.Wrapf(err, "failed to do a random walk on bucket %d", bucketID) + } + }(bucketID, errChan) } } - // Find self to distribute peer info to our neighbors. - return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), dht.selfWalk) + // wait for all walks to finish & close the error channel + go func() { + wg.Wait() + close(errChan) + }() + + // accumulate errors from all go-routines. ensures wait group is completed by reading errChan until closure. + var errStrings []string + for err := range errChan { + errStrings = append(errStrings, err.Error()) + } + if len(errStrings) == 0 { + return nil + } else { + return fmt.Errorf("errors encountered while running bootstrap on RT:\n%s", strings.Join(errStrings, "\n")) + } } -func (dht *IpfsDHT) BootstrapRandom(ctx context.Context) error { - return dht.randomWalk(ctx) +// Traverse the DHT toward the self ID +func (dht *IpfsDHT) selfWalk(ctx context.Context) error { + queryCtx, cancel := context.WithTimeout(ctx, dht.bootstrapCfg.Timeout) + defer cancel() + _, err := dht.FindPeer(queryCtx, dht.self) + if err == routing.ErrNotFound { + return nil + } + return err } -func (dht *IpfsDHT) BootstrapSelf(ctx context.Context) error { - return dht.selfWalk(ctx) +// synchronous bootstrap. +func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error { + if err := dht.selfWalk(ctx); err != nil { + return errors.Wrap(err, "failed bootstrap while searching for self") + } else { + return dht.bootstrapBuckets(ctx) + } } diff --git a/dht_test.go b/dht_test.go index 681f90c..bba86c8 100644 --- a/dht_test.go +++ b/dht_test.go @@ -197,13 +197,10 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { // 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd // probably because results compound - cfg := DefaultBootstrapConfig - cfg.Queries = 3 - start := rand.Intn(len(dhts)) // randomize to decrease bias. for i := range dhts { dht := dhts[(start+i)%len(dhts)] - dht.runBootstrap(ctx, cfg) + dht.bootstrapOnce(ctx) } } @@ -691,6 +688,55 @@ func TestBootstrap(t *testing.T) { } } +func TestBootstrapBelowMinRTThreshold(t *testing.T) { + ctx := context.Background() + dhtA := setupDHT(ctx, t, false) + dhtB := setupDHT(ctx, t, false) + dhtC := setupDHT(ctx, t, false) + + defer func() { + dhtA.Close() + dhtA.host.Close() + + dhtB.Close() + dhtB.host.Close() + + dhtC.Close() + dhtC.host.Close() + }() + + connect(t, ctx, dhtA, dhtB) + connect(t, ctx, dhtB, dhtC) + + // we ONLY init bootstrap on A + dhtA.Bootstrap(ctx) + // and wait for one round to complete i.e. A should be connected to both B & C + waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 2, 2, 20*time.Second) + + // now we create two new peers + dhtD := setupDHT(ctx, t, false) + dhtE := setupDHT(ctx, t, false) + + // connect them to each other + connect(t, ctx, dhtD, dhtE) + defer func() { + dhtD.Close() + dhtD.host.Close() + + dhtE.Close() + dhtE.host.Close() + }() + + // and then, on connecting the peer D to A, the min RT threshold gets triggered on A which leads to a bootstrap. + // since the default bootstrap scan interval is 30 mins - 1 hour, we can be sure that if bootstrap happens, + // it is because of the min RT threshold getting triggered (since default min value is 4 & we only have 2 peers in the RT when D gets connected) + connect(t, ctx, dhtA, dhtD) + + // and because of the above bootstrap, A also discovers E ! + waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 4, 4, 20*time.Second) + assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") +} + func TestPeriodicBootstrap(t *testing.T) { if ci.IsRunning() { t.Skip("skipping on CI. highly timing dependent") @@ -711,9 +757,6 @@ func TestPeriodicBootstrap(t *testing.T) { } }() - cfg := DefaultBootstrapConfig - cfg.Queries = 5 - t.Logf("dhts are not connected. %d", nDHTs) for _, dht := range dhts { rtlen := dht.routingTable.Size() @@ -740,7 +783,7 @@ func TestPeriodicBootstrap(t *testing.T) { t.Logf("bootstrapping them so they find each other. %d", nDHTs) for _, dht := range dhts { - go dht.BootstrapOnce(ctx, cfg) + go dht.bootstrapOnce(ctx) } // this is async, and we dont know when it's finished with one cycle, so keep checking diff --git a/go.mod b/go.mod index 55b7713..bf24e9f 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.12 require ( github.com/gogo/protobuf v1.3.0 + github.com/google/uuid v1.1.1 github.com/hashicorp/golang-lru v0.5.3 github.com/ipfs/go-cid v0.0.3 github.com/ipfs/go-datastore v0.1.0 @@ -24,6 +25,7 @@ require ( github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr-dns v0.0.3 github.com/multiformats/go-multistream v0.1.0 + github.com/pkg/errors v0.8.1 github.com/stretchr/testify v1.3.0 github.com/whyrusleeping/base32 v0.0.0-20170828182744-c30ac30633cc go.opencensus.io v0.22.1 diff --git a/lookup.go b/lookup.go index 41ef735..a00fbfb 100644 --- a/lookup.go +++ b/lookup.go @@ -8,7 +8,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log" pb "github.com/libp2p/go-libp2p-kad-dht/pb" kb "github.com/libp2p/go-libp2p-kbucket" @@ -103,6 +103,9 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee } if res != nil && res.queriedSet != nil { + // refresh the k-bucket containing this key as the query was successful + dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) + sorted := kb.SortClosestPeers(res.queriedSet.Peers(), kb.ConvertKey(key)) l := len(sorted) if l > dht.bucketSize { diff --git a/notif.go b/notif.go index 3af7584..556cadd 100644 --- a/notif.go +++ b/notif.go @@ -32,7 +32,14 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { + bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) + if bootstrap { + select { + case dht.triggerBootstrap <- struct{}{}: + default: + } + } } return } @@ -71,7 +78,14 @@ func (nn *netNotifiee) testConnection(v network.Conn) { dht.plk.Lock() defer dht.plk.Unlock() if dht.host.Network().Connectedness(p) == network.Connected { + bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold dht.Update(dht.Context(), p) + if bootstrap { + select { + case dht.triggerBootstrap <- struct{}{}: + default: + } + } } } diff --git a/opts/options.go b/opts/options.go index 5840d39..2fbb91e 100644 --- a/opts/options.go +++ b/opts/options.go @@ -2,11 +2,12 @@ package dhtopts import ( "fmt" + "time" ds "github.com/ipfs/go-datastore" dssync "github.com/ipfs/go-datastore/sync" - protocol "github.com/libp2p/go-libp2p-core/protocol" - record "github.com/libp2p/go-libp2p-record" + "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p-record" ) // Deprecated: The old format did not support more than one message per stream, and is not supported @@ -18,13 +19,22 @@ var ( DefaultProtocols = []protocol.ID{ProtocolDHT} ) +// BootstrapConfig specifies parameters used for bootstrapping the DHT. +type BootstrapConfig struct { + BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it + Timeout time.Duration // how long to wait for a bootstrap query to run + RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period + SelfQueryInterval time.Duration // how often to query for self +} + // Options is a structure containing all the options that can be used when constructing a DHT. type Options struct { - Datastore ds.Batching - Validator record.Validator - Client bool - Protocols []protocol.ID - BucketSize int + Datastore ds.Batching + Validator record.Validator + Client bool + Protocols []protocol.ID + BucketSize int + BootstrapConfig BootstrapConfig } // Apply applies the given options to this Option @@ -48,9 +58,30 @@ var Defaults = func(o *Options) error { } o.Datastore = dssync.MutexWrap(ds.NewMapDatastore()) o.Protocols = DefaultProtocols + + o.BootstrapConfig = BootstrapConfig{ + // same as that mentioned in the kad dht paper + BucketPeriod: 1 * time.Hour, + + // since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable + RoutingTableScanInterval: 30 * time.Minute, + + Timeout: 10 * time.Second, + + SelfQueryInterval: 1 * time.Hour, + } + return nil } +// Bootstrap configures the dht bootstrapping process +func Bootstrap(b BootstrapConfig) Option { + return func(o *Options) error { + o.BootstrapConfig = b + return nil + } +} + // Datastore configures the DHT to use the specified datastore. // // Defaults to an in-memory (temporary) map. diff --git a/routing.go b/routing.go index 859e966..1cfe293 100644 --- a/routing.go +++ b/routing.go @@ -13,7 +13,7 @@ import ( "github.com/libp2p/go-libp2p-core/peerstore" "github.com/libp2p/go-libp2p-core/routing" - cid "github.com/ipfs/go-cid" + "github.com/ipfs/go-cid" u "github.com/ipfs/go-ipfs-util" logging "github.com/ipfs/go-log" pb "github.com/libp2p/go-libp2p-kad-dht/pb" @@ -98,6 +98,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts }(p) } wg.Wait() + return nil } @@ -380,6 +381,9 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-cha // // We'll just call this a success. if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) { + // refresh the k-bucket containing this key as the query was successful + dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) + err = nil } done(err) @@ -486,6 +490,7 @@ func (dht *IpfsDHT) FindProviders(ctx context.Context, c cid.Cid) ([]peer.AddrIn func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key cid.Cid, count int) <-chan peer.AddrInfo { logger.Event(ctx, "findProviders", key) peerOut := make(chan peer.AddrInfo, count) + go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut) return peerOut } @@ -591,6 +596,9 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key cid.Cid, Extra: err.Error(), }) } + + // refresh the k-bucket containing this key after the query is run + dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) } // FindPeer searches for a peer with given ID. @@ -662,6 +670,9 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo, return peer.AddrInfo{}, err } + // refresh the k-bucket containing this key since the lookup was successful + dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) + logger.Debugf("FindPeer %v %v", id, result.success) if result.peer.ID == "" { return peer.AddrInfo{}, routing.ErrNotFound @@ -730,6 +741,9 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< logger.Debug(err) } + // refresh the k-bucket containing this key + dht.routingTable.BucketForID(kb.ConvertPeerID(id)).ResetRefreshedAt(time.Now()) + // close the peerchan channel when done. close(peerchan) }()