2014-09-17 07:19:40 -07:00
|
|
|
package dht
|
|
|
|
|
|
|
|
import (
|
2014-09-18 19:30:04 -07:00
|
|
|
"sync"
|
|
|
|
|
2014-09-17 07:19:40 -07:00
|
|
|
peer "github.com/jbenet/go-ipfs/peer"
|
|
|
|
queue "github.com/jbenet/go-ipfs/peer/queue"
|
2014-09-18 19:30:04 -07:00
|
|
|
kb "github.com/jbenet/go-ipfs/routing/kbucket"
|
2014-09-17 07:19:40 -07:00
|
|
|
u "github.com/jbenet/go-ipfs/util"
|
2014-09-18 19:30:04 -07:00
|
|
|
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
|
2014-09-17 07:19:40 -07:00
|
|
|
|
|
|
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
|
|
|
)
|
|
|
|
|
2014-09-19 05:23:57 -07:00
|
|
|
var maxQueryConcurrency = AlphaValue
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2014-09-17 07:19:40 -07:00
|
|
|
type dhtQuery struct {
|
2014-09-18 19:30:04 -07:00
|
|
|
// the key we're querying for
|
|
|
|
key u.Key
|
2014-09-17 07:19:40 -07:00
|
|
|
|
|
|
|
// the function to execute per peer
|
|
|
|
qfunc queryFunc
|
2014-09-18 19:30:04 -07:00
|
|
|
|
|
|
|
// the concurrency parameter
|
|
|
|
concurrency int
|
|
|
|
}
|
|
|
|
|
|
|
|
type dhtQueryResult struct {
|
|
|
|
value []byte // GetValue
|
|
|
|
peer *peer.Peer // FindPeer
|
|
|
|
providerPeers []*peer.Peer // GetProviders
|
|
|
|
closerPeers []*peer.Peer // *
|
|
|
|
success bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// constructs query
|
|
|
|
func newQuery(k u.Key, f queryFunc) *dhtQuery {
|
|
|
|
return &dhtQuery{
|
|
|
|
key: k,
|
|
|
|
qfunc: f,
|
|
|
|
concurrency: maxQueryConcurrency,
|
|
|
|
}
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
// QueryFunc is a function that runs a particular query with a given peer.
|
|
|
|
// It returns either:
|
|
|
|
// - the value
|
|
|
|
// - a list of peers potentially better able to serve the query
|
|
|
|
// - an error
|
2014-09-18 19:30:04 -07:00
|
|
|
type queryFunc func(context.Context, *peer.Peer) (*dhtQueryResult, error)
|
|
|
|
|
|
|
|
// Run runs the query at hand. pass in a list of peers to use first.
|
|
|
|
func (q *dhtQuery) Run(ctx context.Context, peers []*peer.Peer) (*dhtQueryResult, error) {
|
|
|
|
runner := newQueryRunner(ctx, q)
|
|
|
|
return runner.Run(peers)
|
|
|
|
}
|
|
|
|
|
|
|
|
type dhtQueryRunner struct {
|
|
|
|
|
|
|
|
// the query to run
|
|
|
|
query *dhtQuery
|
|
|
|
|
|
|
|
// peersToQuery is a list of peers remaining to query
|
|
|
|
peersToQuery *queue.ChanQueue
|
|
|
|
|
|
|
|
// peersSeen are all the peers queried. used to prevent querying same peer 2x
|
|
|
|
peersSeen peer.Map
|
2014-09-17 07:19:40 -07:00
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
// rateLimit is a channel used to rate limit our processing (semaphore)
|
|
|
|
rateLimit chan struct{}
|
|
|
|
|
|
|
|
// peersRemaining is a counter of peers remaining (toQuery + processing)
|
|
|
|
peersRemaining todoctr.Counter
|
|
|
|
|
|
|
|
// context
|
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
|
|
|
|
|
|
|
// result
|
|
|
|
result *dhtQueryResult
|
|
|
|
|
|
|
|
// result errors
|
|
|
|
errs []error
|
|
|
|
|
|
|
|
// lock for concurrent access to fields
|
|
|
|
sync.RWMutex
|
|
|
|
}
|
|
|
|
|
|
|
|
func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
|
2014-09-17 07:19:40 -07:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
return &dhtQueryRunner{
|
|
|
|
ctx: ctx,
|
|
|
|
cancel: cancel,
|
|
|
|
query: q,
|
|
|
|
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
|
|
|
|
peersRemaining: todoctr.NewSyncCounter(),
|
|
|
|
peersSeen: peer.Map{},
|
|
|
|
rateLimit: make(chan struct{}, q.concurrency),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) {
|
2014-09-25 15:10:57 -07:00
|
|
|
log.Debug("Run query with %d peers.", len(peers))
|
|
|
|
if len(peers) == 0 {
|
|
|
|
log.Warning("Running query with no peers!")
|
|
|
|
return nil, nil
|
|
|
|
}
|
2014-09-25 18:29:05 -07:00
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
// setup concurrency rate limiting
|
|
|
|
for i := 0; i < r.query.concurrency; i++ {
|
|
|
|
r.rateLimit <- struct{}{}
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
// add all the peers we got first.
|
|
|
|
for _, p := range peers {
|
|
|
|
r.addPeerToQuery(p, nil) // don't have access to self here...
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|
|
|
|
|
2014-09-18 19:41:41 -07:00
|
|
|
// go do this thing.
|
|
|
|
go r.spawnWorkers()
|
|
|
|
|
|
|
|
// so workers are working.
|
|
|
|
|
|
|
|
// wait until they're done.
|
2014-09-19 08:07:56 -07:00
|
|
|
err := u.ErrNotFound
|
|
|
|
|
2014-09-17 07:19:40 -07:00
|
|
|
select {
|
2014-09-18 19:30:04 -07:00
|
|
|
case <-r.peersRemaining.Done():
|
|
|
|
r.cancel() // ran all and nothing. cancel all outstanding workers.
|
|
|
|
r.RLock()
|
|
|
|
defer r.RUnlock()
|
|
|
|
|
|
|
|
if len(r.errs) > 0 {
|
2014-09-19 08:07:56 -07:00
|
|
|
err = r.errs[0]
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
case <-r.ctx.Done():
|
|
|
|
r.RLock()
|
|
|
|
defer r.RUnlock()
|
2014-09-19 08:07:56 -07:00
|
|
|
err = r.ctx.Err()
|
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
|
2014-09-19 08:07:56 -07:00
|
|
|
if r.result != nil && r.result.success {
|
|
|
|
return r.result, nil
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
2014-09-19 08:07:56 -07:00
|
|
|
return nil, err
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
|
|
|
func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) {
|
|
|
|
if next == nil {
|
|
|
|
// wtf why are peers nil?!?
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Error("Query getting nil peers!!!\n")
|
2014-09-18 19:30:04 -07:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// if new peer further away than whom we got it from, bother (loops)
|
|
|
|
if benchmark != nil && kb.Closer(benchmark.ID, next.ID, r.query.key) {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// if already seen, no need.
|
|
|
|
r.Lock()
|
|
|
|
_, found := r.peersSeen[next.Key()]
|
|
|
|
if found {
|
|
|
|
r.Unlock()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
r.peersSeen[next.Key()] = next
|
|
|
|
r.Unlock()
|
|
|
|
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Debug("adding peer to query: %v\n", next)
|
2014-09-18 19:41:41 -07:00
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
// do this after unlocking to prevent possible deadlocks.
|
|
|
|
r.peersRemaining.Increment(1)
|
|
|
|
select {
|
|
|
|
case r.peersToQuery.EnqChan <- next:
|
|
|
|
case <-r.ctx.Done():
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|
2014-09-18 19:30:04 -07:00
|
|
|
}
|
|
|
|
|
2014-09-18 19:41:41 -07:00
|
|
|
func (r *dhtQueryRunner) spawnWorkers() {
|
2014-09-18 19:30:04 -07:00
|
|
|
for {
|
2014-09-18 19:41:41 -07:00
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
select {
|
|
|
|
case <-r.peersRemaining.Done():
|
|
|
|
return
|
|
|
|
|
|
|
|
case <-r.ctx.Done():
|
|
|
|
return
|
|
|
|
|
2014-09-18 19:41:41 -07:00
|
|
|
case p, more := <-r.peersToQuery.DeqChan:
|
|
|
|
if !more {
|
|
|
|
return // channel closed.
|
|
|
|
}
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Debug("spawning worker for: %v\n", p)
|
2014-09-18 19:30:04 -07:00
|
|
|
go r.queryPeer(p)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2014-09-17 07:19:40 -07:00
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
func (r *dhtQueryRunner) queryPeer(p *peer.Peer) {
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Debug("spawned worker for: %v\n", p)
|
2014-09-18 19:41:41 -07:00
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
// make sure we rate limit concurrency.
|
|
|
|
select {
|
|
|
|
case <-r.rateLimit:
|
|
|
|
case <-r.ctx.Done():
|
|
|
|
r.peersRemaining.Decrement(1)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Debug("running worker for: %v\n", p)
|
2014-09-18 19:41:41 -07:00
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
// finally, run the query against this peer
|
|
|
|
res, err := r.query.qfunc(r.ctx, p)
|
|
|
|
|
|
|
|
if err != nil {
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Debug("ERROR worker for: %v %v\n", p, err)
|
2014-09-18 19:30:04 -07:00
|
|
|
r.Lock()
|
|
|
|
r.errs = append(r.errs, err)
|
|
|
|
r.Unlock()
|
|
|
|
|
|
|
|
} else if res.success {
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Debug("SUCCESS worker for: %v\n", p, res)
|
2014-09-18 19:30:04 -07:00
|
|
|
r.Lock()
|
|
|
|
r.result = res
|
|
|
|
r.Unlock()
|
|
|
|
r.cancel() // signal to everyone that we're done.
|
|
|
|
|
|
|
|
} else if res.closerPeers != nil {
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Debug("PEERS CLOSER -- worker for: %v\n", p)
|
2014-09-18 19:30:04 -07:00
|
|
|
for _, next := range res.closerPeers {
|
|
|
|
r.addPeerToQuery(next, p)
|
|
|
|
}
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|
|
|
|
|
2014-09-18 19:30:04 -07:00
|
|
|
// signal we're done proccessing peer p
|
2014-10-07 21:29:03 -07:00
|
|
|
log.Debug("completing worker for: %v\n", p)
|
2014-09-18 19:30:04 -07:00
|
|
|
r.peersRemaining.Decrement(1)
|
|
|
|
r.rateLimit <- struct{}{}
|
2014-09-17 07:19:40 -07:00
|
|
|
}
|