diff --git a/dht_test.go b/dht_test.go index 2e51961..01558ef 100644 --- a/dht_test.go +++ b/dht_test.go @@ -901,6 +901,41 @@ func TestLayeredGet(t *testing.T) { } } +func TestUnfindablePeer(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + maddrs, peers, dhts := setupDHTS(ctx, 4, t) + defer func() { + for i := 0; i < 4; i++ { + dhts[i].Close() + dhts[i].host.Close() + } + }() + + connect(t, ctx, dhts[0], dhts[1]) + connect(t, ctx, dhts[1], dhts[2]) + connect(t, ctx, dhts[2], dhts[3]) + + // Give DHT 1 a bad addr for DHT 2. + dhts[1].host.Peerstore().ClearAddrs(peers[2]) + dhts[1].host.Peerstore().AddAddr(peers[2], maddrs[0], time.Minute) + + ctxT, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + _, err := dhts[0].FindPeer(ctxT, peers[3]) + if err == nil { + t.Error("should have failed to find peer") + } + if ctxT.Err() != nil { + t.Error("FindPeer should have failed before context expired") + } +} + func TestFindPeer(t *testing.T) { // t.Skip("skipping test to debug another") if testing.Short() { diff --git a/dial_queue.go b/dial_queue.go index 75cc703..9d58a42 100644 --- a/dial_queue.go +++ b/dial_queue.go @@ -315,7 +315,11 @@ func (dq *dialQueue) worker() { return case <-idleTimer.C: // no new dial requests during our idle period; time to scale down. - case p := <-dq.in.DeqChan: + case p, ok := <-dq.in.DeqChan: + if !ok { + return + } + t := time.Now() if err := dq.dialFn(dq.ctx, p); err != nil { logger.Debugf("discarding dialled peer because of error: %v", err) diff --git a/query.go b/query.go index 0a730d7..2219940 100644 --- a/query.go +++ b/query.go @@ -259,6 +259,9 @@ func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error { r.Lock() r.errs = append(r.errs, err) r.Unlock() + + // This peer is dropping out of the race. + r.peersRemaining.Decrement(1) return err } logger.Debugf("connected. dial success.")