314 lines
8.1 KiB
Go
Raw Normal View History

2018-03-25 11:38:08 +02:00
// package query implement a query manager to drive concurrent workers
// to query the DHT. A query is setup with a target key, a queryFunc tasked
// to communicate with a peer, and a set of initial peers. As the query
// progress, queryFunc can return closer peers that will be used to navigate
// closer to the target key in the DHT until an answer is reached.
2014-09-17 07:19:40 -07:00
package dht
import (
2016-09-30 10:24:03 -07:00
"context"
2014-09-18 19:30:04 -07:00
"sync"
u "github.com/ipfs/go-ipfs-util"
logging "github.com/ipfs/go-log"
2016-09-02 20:21:23 +01:00
todoctr "github.com/ipfs/go-todocounter"
process "github.com/jbenet/goprocess"
ctxproc "github.com/jbenet/goprocess/context"
2018-06-06 05:20:14 -07:00
inet "github.com/libp2p/go-libp2p-net"
peer "github.com/libp2p/go-libp2p-peer"
pset "github.com/libp2p/go-libp2p-peer/peerset"
pstore "github.com/libp2p/go-libp2p-peerstore"
queue "github.com/libp2p/go-libp2p-peerstore/queue"
2016-09-02 20:21:23 +01:00
routing "github.com/libp2p/go-libp2p-routing"
notif "github.com/libp2p/go-libp2p-routing/notifications"
2014-09-17 07:19:40 -07:00
)
var maxQueryConcurrency = AlphaValue
2014-09-18 19:30:04 -07:00
2014-09-17 07:19:40 -07:00
type dhtQuery struct {
2015-01-01 12:45:39 -08:00
dht *IpfsDHT
key string // the key we're querying for
2015-01-01 12:45:39 -08:00
qfunc queryFunc // the function to execute per peer
concurrency int // the concurrency parameter
2014-09-18 19:30:04 -07:00
}
type dhtQueryResult struct {
value []byte // GetValue
peer *pstore.PeerInfo // FindPeer
providerPeers []pstore.PeerInfo // GetProviders
closerPeers []*pstore.PeerInfo // *
2014-09-18 19:30:04 -07:00
success bool
finalSet *pset.PeerSet
queriedSet *pset.PeerSet
2014-09-18 19:30:04 -07:00
}
// constructs query
func (dht *IpfsDHT) newQuery(k string, f queryFunc) *dhtQuery {
2014-09-18 19:30:04 -07:00
return &dhtQuery{
key: k,
2015-01-01 12:45:39 -08:00
dht: dht,
2014-09-18 19:30:04 -07:00
qfunc: f,
concurrency: maxQueryConcurrency,
}
2014-09-17 07:19:40 -07:00
}
// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
2014-09-18 19:30:04 -07:00
// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
2015-03-07 02:44:20 -08:00
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
runner := newQueryRunner(q)
return runner.Run(ctx, peers)
2014-09-18 19:30:04 -07:00
}
type dhtQueryRunner struct {
2015-01-05 04:35:54 -08:00
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
peersQueried *pset.PeerSet // peers successfully connected to and queried
2015-01-05 04:35:54 -08:00
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing
2014-09-18 19:30:04 -07:00
2015-01-05 04:35:54 -08:00
result *dhtQueryResult // query result
errs u.MultiErr // result errors. maybe should be a map[peer.ID]error
2014-09-18 19:30:04 -07:00
2015-01-05 04:35:54 -08:00
rateLimit chan struct{} // processing semaphore
log logging.EventLogger
2014-09-18 19:30:04 -07:00
runCtx context.Context
proc process.Process
2014-09-18 19:30:04 -07:00
sync.RWMutex
}
func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
proc := process.WithParent(process.Background())
ctx := ctxproc.OnClosingContext(proc)
2014-09-18 19:30:04 -07:00
return &dhtQueryRunner{
query: q,
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(string(q.key))),
2014-09-18 19:30:04 -07:00
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: pset.New(),
peersQueried: pset.New(),
2014-09-18 19:30:04 -07:00
rateLimit: make(chan struct{}, q.concurrency),
proc: proc,
2014-09-18 19:30:04 -07:00
}
}
func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
2015-01-05 04:35:54 -08:00
r.log = log
r.runCtx = ctx
2015-01-05 04:35:54 -08:00
if len(peers) == 0 {
log.Warning("Running query with no peers!")
return nil, nil
}
2014-09-18 19:30:04 -07:00
// setup concurrency rate limiting
for i := 0; i < r.query.concurrency; i++ {
r.rateLimit <- struct{}{}
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:30:04 -07:00
// add all the peers we got first.
for _, p := range peers {
r.addPeerToQuery(p)
2015-03-07 02:44:20 -08:00
}
2014-09-18 19:41:41 -07:00
// go do this thing.
// do it as a child proc to make sure Run exits
// ONLY AFTER spawn workers has exited.
r.proc.Go(r.spawnWorkers)
2014-09-18 19:41:41 -07:00
// so workers are working.
// wait until they're done.
err := routing.ErrNotFound
2014-09-19 08:07:56 -07:00
// now, if the context finishes, close the proc.
// we have to do it here because the logic before is setup, which
// should run without closing the proc.
ctxproc.CloseAfterContext(r.proc, ctx)
2014-09-17 07:19:40 -07:00
select {
2014-09-18 19:30:04 -07:00
case <-r.peersRemaining.Done():
r.proc.Close()
2014-09-18 19:30:04 -07:00
r.RLock()
defer r.RUnlock()
err = routing.ErrNotFound
// if every query to every peer failed, something must be very wrong.
if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
log.Debugf("query errs: %s", r.errs)
err = r.errs[0]
2014-09-18 19:30:04 -07:00
}
case <-r.proc.Closed():
2014-09-18 19:30:04 -07:00
r.RLock()
defer r.RUnlock()
err = context.DeadlineExceeded
2014-09-19 08:07:56 -07:00
}
2014-09-18 19:30:04 -07:00
2014-09-19 08:07:56 -07:00
if r.result != nil && r.result.success {
return r.result, nil
2014-09-18 19:30:04 -07:00
}
return &dhtQueryResult{
finalSet: r.peersSeen,
queriedSet: r.peersQueried,
}, err
2014-09-18 19:30:04 -07:00
}
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID) {
2014-11-21 08:03:11 -08:00
// if new peer is ourselves...
2015-01-01 12:45:39 -08:00
if next == r.query.dht.self {
2015-01-05 04:35:54 -08:00
r.log.Debug("addPeerToQuery skip self")
2014-11-21 08:03:11 -08:00
return
}
if !r.peersSeen.TryAdd(next) {
2014-09-18 19:30:04 -07:00
return
}
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.AddingPeer,
ID: next,
})
2014-09-18 19:30:04 -07:00
r.peersRemaining.Increment(1)
select {
case r.peersToQuery.EnqChan <- next:
case <-r.proc.Closing():
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:30:04 -07:00
}
func (r *dhtQueryRunner) spawnWorkers(proc process.Process) {
2014-09-18 19:30:04 -07:00
for {
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
select {
case <-r.peersRemaining.Done():
return
case <-r.proc.Closing():
2014-09-18 19:30:04 -07:00
return
case <-r.rateLimit:
select {
case p, more := <-r.peersToQuery.DeqChan:
if !more {
return // channel closed.
}
// do it as a child func to make sure Run exits
// ONLY AFTER spawn workers has exited.
proc.Go(func(proc process.Process) {
r.queryPeer(proc, p)
})
case <-r.proc.Closing():
return
case <-r.peersRemaining.Done():
return
2014-09-18 19:41:41 -07:00
}
2014-09-18 19:30:04 -07:00
}
}
}
2014-09-17 07:19:40 -07:00
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) {
// ok let's do this!
// create a context from our proc.
ctx := ctxproc.OnClosingContext(proc)
// make sure we do this when we exit
defer func() {
2018-03-25 11:38:08 +02:00
// signal we're done processing peer p
r.peersRemaining.Decrement(1)
r.rateLimit <- struct{}{}
}()
// make sure we're connected to the peer.
log(dht): remove lots of query debug logs the debug log is flooded with pages upon pages of... we've gotta be more judicious with our use of console logs. i'm sure there's interesting actionable information in here. let's use the console logging more like a sniper rifle and less like birdshot. feel free to revert if there are specific critical statements in this changeset 03:05:24.096 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) queryPeer(<peer.ID QmSoLp>) QUERY worker for: <peer.ID QmSoLp> - not found, and no closer peers. prefixlog.go:107 03:05:24.096 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) queryPeer(<peer.ID QmSoLp>) completed prefixlog.go:107 03:05:24.096 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) queryPeer(<peer.ID QmSoLp>) finished prefixlog.go:107 03:05:24.096 DEBUG dht: dht(<peer.ID QmWGN3>) FindProviders(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK) Query(<peer.ID QmSoLn>) 0 provider entries prefixlog.go:107 03:05:24.096 DEBUG dht: dht(<peer.ID QmWGN3>) FindProviders(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK) Query(<peer.ID QmSoLn>) 0 provider entries decoded prefixlog.go:107 03:05:24.096 DEBUG dht: dht(<peer.ID QmWGN3>) FindProviders(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK) Query(<peer.ID QmSoLn>) got closer peers: 0 [] prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>) FindProviders(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK) Query(<peer.ID QmSoLn>) end prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) queryPeer(<peer.ID QmSoLn>) query finished prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) queryPeer(<peer.ID QmSoLn>) QUERY worker for: <peer.ID QmSoLn> - not found, and no closer peers. prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) queryPeer(<peer.ID QmSoLn>) completed prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) queryPeer(<peer.ID QmSoLn>) finished prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) all peers ended prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) spawnWorkers end prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) failure: %s routing: not found prefixlog.go:107 03:05:24.097 DEBUG dht: dht(<peer.ID QmWGN3>).Query(QmXvrpUZXCYaCkf1jfaQTJASS91xd47Yih2rnVC5YbFAAK).Run(3) end prefixlog.go:107
2015-01-20 03:05:30 -08:00
// FIXME abstract away into the network layer
2018-06-06 05:20:14 -07:00
// Note: Failure to connect in this block will cause the function to
// short circuit.
if r.query.dht.host.Network().Connectedness(p) == inet.NotConnected {
log.Debug("not connected. dialing.")
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.DialingPeer,
ID: p,
})
2015-01-22 07:34:26 -08:00
// while we dial, we do not take up a rate limit. this is to allow
// forward progress during potentially very high latency dials.
r.rateLimit <- struct{}{}
pi := pstore.PeerInfo{ID: p}
if err := r.query.dht.host.Connect(ctx, pi); err != nil {
2015-01-05 04:35:54 -08:00
log.Debugf("Error connecting: %s", err)
notif.PublishQueryEvent(r.runCtx, &notif.QueryEvent{
Type: notif.QueryError,
Extra: err.Error(),
ID: p,
})
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
2015-01-22 07:34:26 -08:00
<-r.rateLimit // need to grab it again, as we deferred.
return
}
2015-01-22 07:34:26 -08:00
<-r.rateLimit // need to grab it again, as we deferred.
2015-01-05 04:35:54 -08:00
log.Debugf("connected. dial success.")
}
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
// finally, run the query against this peer
res, err := r.query.qfunc(ctx, p)
2014-09-18 19:30:04 -07:00
r.peersQueried.Add(p)
2014-09-18 19:30:04 -07:00
if err != nil {
2014-10-30 06:35:29 -07:00
log.Debugf("ERROR worker for: %v %v", p, err)
2014-09-18 19:30:04 -07:00
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
} else if res.success {
2015-01-05 04:35:54 -08:00
log.Debugf("SUCCESS worker for: %v %s", p, res)
2014-09-18 19:30:04 -07:00
r.Lock()
r.result = res
r.Unlock()
go r.proc.Close() // signal to everyone that we're done.
// must be async, as we're one of the children, and Close blocks.
2014-09-18 19:30:04 -07:00
} else if len(res.closerPeers) > 0 {
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
2014-09-18 19:30:04 -07:00
for _, next := range res.closerPeers {
2018-03-25 11:38:08 +02:00
if next.ID == r.query.dht.self { // don't add self.
log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
continue
}
// add their addresses to the dialer's peerstore
r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
r.addPeerToQuery(next.ID)
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
2014-09-18 19:30:04 -07:00
}
} else {
log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
2014-09-17 07:19:40 -07:00
}
}