275 lines
7.1 KiB
Go
Raw Normal View History

2014-09-17 07:19:40 -07:00
package dht
import (
2014-09-18 19:30:04 -07:00
"sync"
peer "github.com/jbenet/go-ipfs/p2p/peer"
queue "github.com/jbenet/go-ipfs/p2p/peer/queue"
"github.com/jbenet/go-ipfs/routing"
eventlog "github.com/jbenet/go-ipfs/thirdparty/eventlog"
2014-09-17 07:19:40 -07:00
u "github.com/jbenet/go-ipfs/util"
pset "github.com/jbenet/go-ipfs/util/peerset"
2014-09-18 19:30:04 -07:00
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
2014-09-17 07:19:40 -07:00
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
2014-09-17 07:19:40 -07:00
)
var maxQueryConcurrency = AlphaValue
2014-09-18 19:30:04 -07:00
2014-09-17 07:19:40 -07:00
type dhtQuery struct {
2015-01-01 12:45:39 -08:00
dht *IpfsDHT
key u.Key // the key we're querying for
qfunc queryFunc // the function to execute per peer
concurrency int // the concurrency parameter
2014-09-18 19:30:04 -07:00
}
type dhtQueryResult struct {
value []byte // GetValue
peer peer.PeerInfo // FindPeer
providerPeers []peer.PeerInfo // GetProviders
closerPeers []peer.PeerInfo // *
2014-09-18 19:30:04 -07:00
success bool
}
// constructs query
2015-01-01 12:45:39 -08:00
func (dht *IpfsDHT) newQuery(k u.Key, f queryFunc) *dhtQuery {
2014-09-18 19:30:04 -07:00
return &dhtQuery{
key: k,
2015-01-01 12:45:39 -08:00
dht: dht,
2014-09-18 19:30:04 -07:00
qfunc: f,
concurrency: maxQueryConcurrency,
}
2014-09-17 07:19:40 -07:00
}
// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
2014-09-18 19:30:04 -07:00
// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
2014-09-18 19:30:04 -07:00
runner := newQueryRunner(ctx, q)
return runner.Run(peers)
}
type dhtQueryRunner struct {
2015-01-05 04:35:54 -08:00
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing
2014-09-18 19:30:04 -07:00
2015-01-05 04:35:54 -08:00
result *dhtQueryResult // query result
errs u.MultiErr // result errors. maybe should be a map[peer.ID]error
2014-09-18 19:30:04 -07:00
2015-01-05 04:35:54 -08:00
rateLimit chan struct{} // processing semaphore
log eventlog.EventLogger
2014-09-18 19:30:04 -07:00
cg ctxgroup.ContextGroup
2014-09-18 19:30:04 -07:00
sync.RWMutex
}
func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
return &dhtQueryRunner{
query: q,
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: pset.New(),
2014-09-18 19:30:04 -07:00
rateLimit: make(chan struct{}, q.concurrency),
cg: ctxgroup.WithContext(ctx),
2014-09-18 19:30:04 -07:00
}
}
func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
2015-01-05 04:35:54 -08:00
log := log.Prefix("dht(%s).Query(%s).Run(%d)", r.query.dht.self, r.query.key, len(peers))
r.log = log
log.Debug("enter")
defer log.Debug("end")
2014-10-30 06:35:29 -07:00
log.Debugf("Run query with %d peers.", len(peers))
if len(peers) == 0 {
log.Warning("Running query with no peers!")
return nil, nil
}
2014-09-18 19:30:04 -07:00
// setup concurrency rate limiting
for i := 0; i < r.query.concurrency; i++ {
r.rateLimit <- struct{}{}
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:30:04 -07:00
// add all the peers we got first.
for _, p := range peers {
r.addPeerToQuery(r.cg.Context(), p)
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:41:41 -07:00
// go do this thing.
// do it as a child func to make sure Run exits
// ONLY AFTER spawn workers has exited.
2015-01-05 04:35:54 -08:00
log.Debugf("go spawn workers")
r.cg.AddChildFunc(r.spawnWorkers)
2014-09-18 19:41:41 -07:00
// so workers are working.
// wait until they're done.
err := routing.ErrNotFound
2014-09-19 08:07:56 -07:00
2014-09-17 07:19:40 -07:00
select {
2014-09-18 19:30:04 -07:00
case <-r.peersRemaining.Done():
2015-01-05 04:35:54 -08:00
log.Debug("all peers ended")
r.cg.Close()
2014-09-18 19:30:04 -07:00
r.RLock()
defer r.RUnlock()
err = routing.ErrNotFound
// if every query to every peer failed, something must be very wrong.
if len(r.errs) > 0 && len(r.errs) == r.peersSeen.Size() {
log.Debugf("query errs: %s", r.errs)
err = r.errs[0]
2014-09-18 19:30:04 -07:00
}
case <-r.cg.Closed():
2015-01-05 04:35:54 -08:00
log.Debug("r.cg.Closed()")
2014-09-18 19:30:04 -07:00
r.RLock()
defer r.RUnlock()
err = r.cg.Context().Err() // collect the error.
2014-09-19 08:07:56 -07:00
}
2014-09-18 19:30:04 -07:00
2014-09-19 08:07:56 -07:00
if r.result != nil && r.result.success {
2015-01-05 04:35:54 -08:00
log.Debug("success: %s", r.result)
2014-09-19 08:07:56 -07:00
return r.result, nil
2014-09-18 19:30:04 -07:00
}
2015-01-05 04:35:54 -08:00
log.Debug("failure: %s", err)
2014-09-19 08:07:56 -07:00
return nil, err
2014-09-18 19:30:04 -07:00
}
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
2014-11-21 08:03:11 -08:00
// if new peer is ourselves...
2015-01-01 12:45:39 -08:00
if next == r.query.dht.self {
2015-01-05 04:35:54 -08:00
r.log.Debug("addPeerToQuery skip self")
2014-11-21 08:03:11 -08:00
return
}
if !r.peersSeen.TryAdd(next) {
2015-01-05 04:35:54 -08:00
r.log.Debugf("addPeerToQuery skip seen %s", next)
2014-09-18 19:30:04 -07:00
return
}
2015-01-05 04:35:54 -08:00
r.log.Debugf("addPeerToQuery adding %s", next)
2014-09-18 19:30:04 -07:00
r.peersRemaining.Increment(1)
select {
case r.peersToQuery.EnqChan <- next:
case <-ctx.Done():
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:30:04 -07:00
}
func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
2015-01-05 04:35:54 -08:00
log := r.log.Prefix("spawnWorkers")
log.Debugf("begin")
defer log.Debugf("end")
2014-09-18 19:30:04 -07:00
for {
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
select {
case <-r.peersRemaining.Done():
return
case <-r.cg.Closing():
2014-09-18 19:30:04 -07:00
return
2014-09-18 19:41:41 -07:00
case p, more := <-r.peersToQuery.DeqChan:
if !more {
return // channel closed.
}
log.Debugf("spawning worker for: %v", p)
// do it as a child func to make sure Run exits
// ONLY AFTER spawn workers has exited.
parent.AddChildFunc(func(cg ctxgroup.ContextGroup) {
r.queryPeer(cg, p)
})
2014-09-18 19:30:04 -07:00
}
}
}
2014-09-17 07:19:40 -07:00
func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
2015-01-05 04:35:54 -08:00
log := r.log.Prefix("queryPeer(%s)", p)
log.Debugf("spawned")
defer log.Debugf("finished")
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
// make sure we rate limit concurrency.
select {
case <-r.rateLimit:
case <-cg.Closing():
2014-09-18 19:30:04 -07:00
r.peersRemaining.Decrement(1)
return
}
// ok let's do this!
2015-01-05 04:35:54 -08:00
log.Debugf("running")
// make sure we do this when we exit
defer func() {
// signal we're done proccessing peer p
2015-01-05 04:35:54 -08:00
log.Debugf("completed")
r.peersRemaining.Decrement(1)
r.rateLimit <- struct{}{}
}()
// make sure we're connected to the peer.
2015-01-01 12:45:39 -08:00
if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
2015-01-05 04:35:54 -08:00
log.Infof("not connected. dialing.")
2015-01-01 12:45:39 -08:00
pi := peer.PeerInfo{ID: p}
if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil {
2015-01-05 04:35:54 -08:00
log.Debugf("Error connecting: %s", err)
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
return
}
2015-01-05 04:35:54 -08:00
log.Debugf("connected. dial success.")
}
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
// finally, run the query against this peer
2015-01-05 04:35:54 -08:00
log.Debugf("query running")
res, err := r.query.qfunc(cg.Context(), p)
2015-01-05 04:35:54 -08:00
log.Debugf("query finished")
2014-09-18 19:30:04 -07:00
if err != nil {
2014-10-30 06:35:29 -07:00
log.Debugf("ERROR worker for: %v %v", p, err)
2014-09-18 19:30:04 -07:00
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
} else if res.success {
2015-01-05 04:35:54 -08:00
log.Debugf("SUCCESS worker for: %v %s", p, res)
2014-09-18 19:30:04 -07:00
r.Lock()
r.result = res
r.Unlock()
go r.cg.Close() // signal to everyone that we're done.
// must be async, as we're one of the children, and Close blocks.
2014-09-18 19:30:04 -07:00
} else if len(res.closerPeers) > 0 {
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
2014-09-18 19:30:04 -07:00
for _, next := range res.closerPeers {
if next.ID == r.query.dht.self { // dont add self.
log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
continue
}
// add their addresses to the dialer's peerstore
2015-01-01 12:45:39 -08:00
r.query.dht.peerstore.AddPeerInfo(next)
r.addPeerToQuery(cg.Context(), next.ID)
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
2014-09-18 19:30:04 -07:00
}
} else {
log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
2014-09-17 07:19:40 -07:00
}
}