265 lines
5.8 KiB
Go
Raw Normal View History

2014-09-17 07:19:40 -07:00
package dht
import (
2014-09-18 19:30:04 -07:00
"sync"
inet "github.com/jbenet/go-ipfs/net"
2014-09-17 07:19:40 -07:00
peer "github.com/jbenet/go-ipfs/peer"
queue "github.com/jbenet/go-ipfs/peer/queue"
"github.com/jbenet/go-ipfs/routing"
2014-09-18 19:30:04 -07:00
kb "github.com/jbenet/go-ipfs/routing/kbucket"
2014-09-17 07:19:40 -07:00
u "github.com/jbenet/go-ipfs/util"
2014-09-18 19:30:04 -07:00
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
2014-09-17 07:19:40 -07:00
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
var maxQueryConcurrency = AlphaValue
2014-09-18 19:30:04 -07:00
2014-09-17 07:19:40 -07:00
type dhtQuery struct {
2014-09-18 19:30:04 -07:00
// the key we're querying for
key u.Key
2014-09-17 07:19:40 -07:00
// dialer used to ensure we're connected to peers
dialer inet.Dialer
2014-09-17 07:19:40 -07:00
// the function to execute per peer
qfunc queryFunc
2014-09-18 19:30:04 -07:00
// the concurrency parameter
concurrency int
}
type dhtQueryResult struct {
value []byte // GetValue
peer peer.Peer // FindPeer
providerPeers []peer.Peer // GetProviders
closerPeers []peer.Peer // *
2014-09-18 19:30:04 -07:00
success bool
}
// constructs query
func newQuery(k u.Key, d inet.Dialer, f queryFunc) *dhtQuery {
2014-09-18 19:30:04 -07:00
return &dhtQuery{
key: k,
dialer: d,
2014-09-18 19:30:04 -07:00
qfunc: f,
concurrency: maxQueryConcurrency,
}
2014-09-17 07:19:40 -07:00
}
// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, peer.Peer) (*dhtQueryResult, error)
2014-09-18 19:30:04 -07:00
// Run runs the query at hand. pass in a list of peers to use first.
func (q *dhtQuery) Run(ctx context.Context, peers []peer.Peer) (*dhtQueryResult, error) {
2014-09-18 19:30:04 -07:00
runner := newQueryRunner(ctx, q)
return runner.Run(peers)
}
type dhtQueryRunner struct {
// the query to run
query *dhtQuery
// peersToQuery is a list of peers remaining to query
peersToQuery *queue.ChanQueue
// peersSeen are all the peers queried. used to prevent querying same peer 2x
peersSeen peer.Map
2014-09-17 07:19:40 -07:00
2014-09-18 19:30:04 -07:00
// rateLimit is a channel used to rate limit our processing (semaphore)
rateLimit chan struct{}
// peersRemaining is a counter of peers remaining (toQuery + processing)
peersRemaining todoctr.Counter
// context
ctx context.Context
cancel context.CancelFunc
// result
result *dhtQueryResult
// result errors
errs []error
// lock for concurrent access to fields
sync.RWMutex
}
func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
2014-09-17 07:19:40 -07:00
ctx, cancel := context.WithCancel(ctx)
2014-09-18 19:30:04 -07:00
return &dhtQueryRunner{
ctx: ctx,
cancel: cancel,
query: q,
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
peersRemaining: todoctr.NewSyncCounter(),
peersSeen: peer.Map{},
rateLimit: make(chan struct{}, q.concurrency),
}
}
func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) {
2014-10-30 06:35:29 -07:00
log.Debugf("Run query with %d peers.", len(peers))
if len(peers) == 0 {
log.Warning("Running query with no peers!")
return nil, nil
}
2014-09-18 19:30:04 -07:00
// setup concurrency rate limiting
for i := 0; i < r.query.concurrency; i++ {
r.rateLimit <- struct{}{}
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:30:04 -07:00
// add all the peers we got first.
for _, p := range peers {
r.addPeerToQuery(p, nil) // don't have access to self here...
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:41:41 -07:00
// go do this thing.
go r.spawnWorkers()
// so workers are working.
// wait until they're done.
err := routing.ErrNotFound
2014-09-19 08:07:56 -07:00
2014-09-17 07:19:40 -07:00
select {
2014-09-18 19:30:04 -07:00
case <-r.peersRemaining.Done():
r.cancel() // ran all and nothing. cancel all outstanding workers.
r.RLock()
defer r.RUnlock()
if len(r.errs) > 0 {
2014-09-19 08:07:56 -07:00
err = r.errs[0]
2014-09-18 19:30:04 -07:00
}
case <-r.ctx.Done():
r.RLock()
defer r.RUnlock()
2014-09-19 08:07:56 -07:00
err = r.ctx.Err()
}
2014-09-18 19:30:04 -07:00
2014-09-19 08:07:56 -07:00
if r.result != nil && r.result.success {
return r.result, nil
2014-09-18 19:30:04 -07:00
}
2014-09-19 08:07:56 -07:00
return nil, err
2014-09-18 19:30:04 -07:00
}
func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) {
2014-09-18 19:30:04 -07:00
if next == nil {
// wtf why are peers nil?!?
log.Error("Query getting nil peers!!!\n")
2014-09-18 19:30:04 -07:00
return
}
// if new peer further away than whom we got it from, bother (loops)
if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) {
2014-09-18 19:30:04 -07:00
return
}
// if already seen, no need.
r.Lock()
_, found := r.peersSeen[next.Key()]
if found {
r.Unlock()
return
}
r.peersSeen[next.Key()] = next
r.Unlock()
2014-10-30 06:35:29 -07:00
log.Debugf("adding peer to query: %v\n", next)
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
// do this after unlocking to prevent possible deadlocks.
r.peersRemaining.Increment(1)
select {
case r.peersToQuery.EnqChan <- next:
case <-r.ctx.Done():
2014-09-17 07:19:40 -07:00
}
2014-09-18 19:30:04 -07:00
}
2014-09-18 19:41:41 -07:00
func (r *dhtQueryRunner) spawnWorkers() {
2014-09-18 19:30:04 -07:00
for {
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
select {
case <-r.peersRemaining.Done():
return
case <-r.ctx.Done():
return
2014-09-18 19:41:41 -07:00
case p, more := <-r.peersToQuery.DeqChan:
if !more {
return // channel closed.
}
2014-10-30 06:35:29 -07:00
log.Debugf("spawning worker for: %v\n", p)
2014-09-18 19:30:04 -07:00
go r.queryPeer(p)
}
}
}
2014-09-17 07:19:40 -07:00
func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
log.Debugf("spawned worker for: %v", p)
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
// make sure we rate limit concurrency.
select {
case <-r.rateLimit:
case <-r.ctx.Done():
r.peersRemaining.Decrement(1)
return
}
// ok let's do this!
2014-10-30 06:35:29 -07:00
log.Debugf("running worker for: %v", p)
// make sure we do this when we exit
defer func() {
// signal we're done proccessing peer p
2014-10-30 06:35:29 -07:00
log.Debugf("completing worker for: %v", p)
r.peersRemaining.Decrement(1)
r.rateLimit <- struct{}{}
}()
// make sure we're connected to the peer.
2014-10-21 03:02:31 -07:00
// (Incidentally, this will add it to the peerstore too)
2014-11-05 04:26:30 -08:00
err := r.query.dialer.DialPeer(r.ctx, p)
if err != nil {
2014-10-30 06:35:29 -07:00
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
return
}
2014-09-18 19:41:41 -07:00
2014-09-18 19:30:04 -07:00
// finally, run the query against this peer
res, err := r.query.qfunc(r.ctx, p)
if err != nil {
2014-10-30 06:35:29 -07:00
log.Debugf("ERROR worker for: %v %v", p, err)
2014-09-18 19:30:04 -07:00
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
} else if res.success {
2014-10-30 06:35:29 -07:00
log.Debugf("SUCCESS worker for: %v", p, res)
2014-09-18 19:30:04 -07:00
r.Lock()
r.result = res
r.Unlock()
r.cancel() // signal to everyone that we're done.
} else if res.closerPeers != nil {
log.Debugf("PEERS CLOSER -- worker for: %v", p)
2014-09-18 19:30:04 -07:00
for _, next := range res.closerPeers {
r.addPeerToQuery(next, p)
}
2014-09-17 07:19:40 -07:00
}
}