mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
query wasnt ensuring conn
The query-- once it's actually attempting to connect to a peer-- should be the one connecting.
This commit is contained in:
parent
ae40568b44
commit
5a2dc1a9f5
41
query.go
41
query.go
@ -14,10 +14,18 @@ import (
|
||||
|
||||
var maxQueryConcurrency = AlphaValue
|
||||
|
||||
type dhtDialer interface {
|
||||
// DialPeer attempts to establish a connection to a given peer
|
||||
DialPeer(peer.Peer) error
|
||||
}
|
||||
|
||||
type dhtQuery struct {
|
||||
// the key we're querying for
|
||||
key u.Key
|
||||
|
||||
// dialer used to ensure we're connected to peers
|
||||
dialer dhtDialer
|
||||
|
||||
// the function to execute per peer
|
||||
qfunc queryFunc
|
||||
|
||||
@ -34,9 +42,10 @@ type dhtQueryResult struct {
|
||||
}
|
||||
|
||||
// constructs query
|
||||
func newQuery(k u.Key, f queryFunc) *dhtQuery {
|
||||
func newQuery(k u.Key, d dhtDialer, f queryFunc) *dhtQuery {
|
||||
return &dhtQuery{
|
||||
key: k,
|
||||
dialer: d,
|
||||
qfunc: f,
|
||||
concurrency: maxQueryConcurrency,
|
||||
}
|
||||
@ -211,19 +220,38 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug("running worker for: %v\n", p)
|
||||
// ok let's do this!
|
||||
log.Debug("running worker for: %v", p)
|
||||
|
||||
// make sure we do this when we exit
|
||||
defer func() {
|
||||
// signal we're done proccessing peer p
|
||||
log.Debug("completing worker for: %v", p)
|
||||
r.peersRemaining.Decrement(1)
|
||||
r.rateLimit <- struct{}{}
|
||||
}()
|
||||
|
||||
// make sure we're connected to the peer.
|
||||
err := r.query.dialer.DialPeer(p)
|
||||
if err != nil {
|
||||
log.Debug("ERROR worker for: %v -- err connecting: %v", p, err)
|
||||
r.Lock()
|
||||
r.errs = append(r.errs, err)
|
||||
r.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// finally, run the query against this peer
|
||||
res, err := r.query.qfunc(r.ctx, p)
|
||||
|
||||
if err != nil {
|
||||
log.Debug("ERROR worker for: %v %v\n", p, err)
|
||||
log.Debug("ERROR worker for: %v %v", p, err)
|
||||
r.Lock()
|
||||
r.errs = append(r.errs, err)
|
||||
r.Unlock()
|
||||
|
||||
} else if res.success {
|
||||
log.Debug("SUCCESS worker for: %v\n", p, res)
|
||||
log.Debug("SUCCESS worker for: %v", p, res)
|
||||
r.Lock()
|
||||
r.result = res
|
||||
r.Unlock()
|
||||
@ -235,9 +263,4 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
|
||||
r.addPeerToQuery(next, p)
|
||||
}
|
||||
}
|
||||
|
||||
// signal we're done proccessing peer p
|
||||
log.Debug("completing worker for: %v\n", p)
|
||||
r.peersRemaining.Decrement(1)
|
||||
r.rateLimit <- struct{}{}
|
||||
}
|
||||
|
@ -29,7 +29,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
|
||||
peers = append(peers, npeers...)
|
||||
}
|
||||
|
||||
query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
log.Debug("%s PutValue qry part %v", dht.self, p)
|
||||
err := dht.putValueToNetwork(ctx, p, string(key), value)
|
||||
if err != nil {
|
||||
@ -65,7 +65,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||
}
|
||||
|
||||
// setup the Query
|
||||
query := newQuery(key, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
|
||||
val, peers, err := dht.getValueOrPeers(ctx, p, key, routeLevel)
|
||||
if err != nil {
|
||||
@ -230,7 +230,7 @@ func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (peer.Peer
|
||||
}
|
||||
|
||||
// setup query function
|
||||
query := newQuery(u.Key(id), func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
|
||||
if err != nil {
|
||||
log.Error("%s getPeer error: %v", dht.self, err)
|
||||
|
Loading…
x
Reference in New Issue
Block a user