mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 06:42:13 +00:00
query: fix error "leak"
Long-running queries can build up large error sets that we never actually use. This is exacerbated by https://github.com/libp2p/go-libp2p-swarm/pull/115. fixes https://github.com/libp2p/go-libp2p-swarm/issues/119
This commit is contained in:
parent
ca611b1605
commit
3c9f5bcd92
32
query.go
32
query.go
@ -2,9 +2,9 @@ package dht
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
u "github.com/ipfs/go-ipfs-util"
|
||||
logging "github.com/ipfs/go-log"
|
||||
todoctr "github.com/ipfs/go-todocounter"
|
||||
process "github.com/jbenet/goprocess"
|
||||
@ -18,6 +18,9 @@ import (
|
||||
notif "github.com/libp2p/go-libp2p-routing/notifications"
|
||||
)
|
||||
|
||||
// ErrNoPeersQueried is returned when we failed to connect to any peers.
|
||||
var ErrNoPeersQueried = errors.New("failed to query any peers")
|
||||
|
||||
var maxQueryConcurrency = AlphaValue
|
||||
|
||||
type dhtQuery struct {
|
||||
@ -77,7 +80,6 @@ type dhtQueryRunner struct {
|
||||
peersRemaining todoctr.Counter // peersToQuery + currently processing
|
||||
|
||||
result *dhtQueryResult // query result
|
||||
errs u.MultiErr // result errors. maybe should be a map[peer.ID]error
|
||||
|
||||
rateLimit chan struct{} // processing semaphore
|
||||
log logging.EventLogger
|
||||
@ -155,23 +157,19 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
|
||||
select {
|
||||
case <-r.peersRemaining.Done():
|
||||
r.proc.Close()
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
err = routing.ErrNotFound
|
||||
|
||||
// if every query to every peer failed, something must be very wrong.
|
||||
if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
|
||||
logger.Debugf("query errs: %s", r.errs)
|
||||
err = r.errs[0]
|
||||
if r.peersQueried.Size() == 0 {
|
||||
err = ErrNoPeersQueried
|
||||
} else {
|
||||
err = routing.ErrNotFound
|
||||
}
|
||||
|
||||
case <-r.proc.Closed():
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
err = r.runCtx.Err()
|
||||
}
|
||||
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
|
||||
if r.result != nil && r.result.success {
|
||||
return r.result, nil
|
||||
}
|
||||
@ -257,10 +255,6 @@ func (r *dhtQueryRunner) dialPeer(ctx context.Context, p peer.ID) error {
|
||||
ID: p,
|
||||
})
|
||||
|
||||
r.Lock()
|
||||
r.errs = append(r.errs, err)
|
||||
r.Unlock()
|
||||
|
||||
// This peer is dropping out of the race.
|
||||
r.peersRemaining.Decrement(1)
|
||||
return err
|
||||
@ -289,10 +283,6 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
|
||||
|
||||
if err != nil {
|
||||
logger.Debugf("ERROR worker for: %v %v", p, err)
|
||||
r.Lock()
|
||||
r.errs = append(r.errs, err)
|
||||
r.Unlock()
|
||||
|
||||
} else if res.success {
|
||||
logger.Debugf("SUCCESS worker for: %v %s", p, res)
|
||||
r.Lock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user