86 lines
1.7 KiB
Go
Raw Normal View History

2014-09-17 07:19:40 -07:00
package dht
import (
peer "github.com/jbenet/go-ipfs/peer"
queue "github.com/jbenet/go-ipfs/peer/queue"
u "github.com/jbenet/go-ipfs/util"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
)
type dhtQuery struct {
// a PeerQueue
peers queue.PeerQueue
// the function to execute per peer
qfunc queryFunc
}
// QueryFunc is a function that runs a particular query with a given peer.
// It returns either:
// - the value
// - a list of peers potentially better able to serve the query
// - an error
type queryFunc func(context.Context, *peer.Peer) (interface{}, []*peer.Peer, error)
func (q *dhtQuery) Run(ctx context.Context, concurrency int) (interface{}, error) {
// get own cancel function to signal when we've found the value
ctx, cancel := context.WithCancel(ctx)
// the variable waiting to be populated upon success
var result interface{}
// chanQueue is how workers receive their work
chanQueue := queue.NewChanQueue(ctx, q.peers)
// worker
worker := func() {
for {
select {
case p := <-chanQueue.DeqChan:
val, closer, err := q.qfunc(ctx, p)
if err != nil {
u.PErr("error running query: %v\n", err)
continue
}
if val != nil {
result = val
cancel() // signal we're done.
return
}
if closer != nil {
for _, p := range closer {
select {
case chanQueue.EnqChan <- p:
case <-ctx.Done():
return
}
}
}
case <-ctx.Done():
return
}
}
}
// launch all workers
for i := 0; i < concurrency; i++ {
go worker()
}
// wait until we're done. yep.
select {
case <-ctx.Done():
}
if result != nil {
return result, nil
}
return nil, ctx.Err()
}