mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
error on queries to empty routing table
This commit is contained in:
parent
a9e21d2672
commit
719a16ff88
@ -8,6 +8,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -1674,8 +1675,8 @@ func TestGetSetPluggedProtocol(t *testing.T) {
|
||||
time.Sleep(time.Second * 2)
|
||||
|
||||
err = dhtA.PutValue(ctx, "/v/cat", []byte("meow"))
|
||||
if err != nil {
|
||||
t.Fatalf("putting to an empty routing table should succeed, err: '%v'", err)
|
||||
if err == nil || !strings.Contains(err.Error(), "failed to find any peer in table") {
|
||||
t.Fatalf("put should not have been able to find any peers in routing table, err:'%v'", err)
|
||||
}
|
||||
|
||||
v, err := dhtB.GetValue(ctx, "/v/cat")
|
||||
|
@ -73,7 +73,7 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
|
||||
e := logger.EventBegin(ctx, "getClosestPeers", loggableKey(key))
|
||||
defer e.Done()
|
||||
|
||||
queries := dht.runDisjointQueries(ctx, dht.d, key,
|
||||
queries, err := dht.runDisjointQueries(ctx, dht.d, key,
|
||||
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
|
||||
// For DHT query command
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
@ -100,6 +100,10 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee
|
||||
func(peerset *kpeerset.SortedPeerset) bool { return false },
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
out := make(chan peer.ID, dht.bucketSize)
|
||||
defer close(out)
|
||||
|
||||
|
19
query.go
19
query.go
@ -31,21 +31,20 @@ type qu struct {
|
||||
stopFn sfn
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn qfn, stopFn sfn) []*qu {
|
||||
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn qfn, stopFn sfn) ([]*qu, error) {
|
||||
queryCtx, cancelQuery := context.WithCancel(ctx)
|
||||
|
||||
numQueriesComplete := 0
|
||||
queryDone := make(chan struct{}, d)
|
||||
|
||||
seedPeers := dht.routingTable.NearestPeers(kb.ConvertKey(target), KValue)
|
||||
// TODO does this need to be here?
|
||||
//if len(seedPeers) == 0 {
|
||||
// routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
// Type: routing.QueryError,
|
||||
// Extra: kb.ErrLookupFailure.Error(),
|
||||
// })
|
||||
// return nil, kb.ErrLookupFailure
|
||||
//}
|
||||
if len(seedPeers) == 0 {
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
Type: routing.QueryError,
|
||||
Extra: kb.ErrLookupFailure.Error(),
|
||||
})
|
||||
return nil, kb.ErrLookupFailure
|
||||
}
|
||||
|
||||
dht.rng.Shuffle(len(seedPeers), func(i, j int) {
|
||||
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
|
||||
@ -93,7 +92,7 @@ loop:
|
||||
}
|
||||
}
|
||||
|
||||
return queries
|
||||
return queries, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePeers {
|
||||
|
10
routing.go
10
routing.go
@ -332,7 +332,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
|
||||
}
|
||||
|
||||
go func() {
|
||||
queries := dht.runDisjointQueries(ctx, dht.d, key,
|
||||
queries, _ := dht.runDisjointQueries(ctx, dht.d, key,
|
||||
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
|
||||
// For DHT query command
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
@ -553,7 +553,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
|
||||
}
|
||||
}
|
||||
|
||||
dht.runDisjointQueries(ctx, dht.d, string(key),
|
||||
_, _ = dht.runDisjointQueries(ctx, dht.d, string(key),
|
||||
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
|
||||
// For DHT query command
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
@ -630,7 +630,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
|
||||
return pi, nil
|
||||
}
|
||||
|
||||
queries := dht.runDisjointQueries(ctx, dht.d, string(id),
|
||||
queries, err := dht.runDisjointQueries(ctx, dht.d, string(id),
|
||||
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
|
||||
// For DHT query command
|
||||
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
|
||||
@ -659,6 +659,10 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
|
||||
},
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return peer.AddrInfo{}, err
|
||||
}
|
||||
|
||||
// logger.Debugf("FindPeer %v %v", id, result.success)
|
||||
|
||||
if dht.host.Network().Connectedness(id) == network.Connected {
|
||||
|
Loading…
x
Reference in New Issue
Block a user