mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
SearchValue: address review
This commit is contained in:
parent
51c475d117
commit
d72432caf1
@ -36,6 +36,7 @@ func TestGetFailures(t *testing.T) {
|
||||
|
||||
// Reply with failures to every message
|
||||
hosts[1].SetStreamHandler(d.protocols[0], func(s inet.Stream) {
|
||||
time.Sleep(400 * time.Millisecond)
|
||||
s.Close()
|
||||
})
|
||||
|
||||
@ -47,7 +48,7 @@ func TestGetFailures(t *testing.T) {
|
||||
err = merr[0]
|
||||
}
|
||||
|
||||
if err != routing.ErrNotFound {
|
||||
if err != context.DeadlineExceeded {
|
||||
t.Fatal("Got different error than we expected", err)
|
||||
}
|
||||
} else {
|
||||
|
29
routing.go
29
routing.go
@ -137,6 +137,10 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Opti
|
||||
best = r
|
||||
}
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return best, ctx.Err()
|
||||
}
|
||||
|
||||
if best == nil {
|
||||
return nil, routing.ErrNotFound
|
||||
}
|
||||
@ -164,10 +168,12 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O
|
||||
go func() {
|
||||
defer close(out)
|
||||
|
||||
if responsesNeeded < 0 {
|
||||
responsesNeeded = 0
|
||||
maxVals := responsesNeeded
|
||||
if maxVals < 0 {
|
||||
maxVals = defaultQuorum * 4 // we want some upper bound on how
|
||||
// much correctional entries we will send
|
||||
}
|
||||
vals := make([]RecvdVal, 0, responsesNeeded)
|
||||
vals := make([]RecvdVal, 0, maxVals)
|
||||
best := -1
|
||||
|
||||
defer func() {
|
||||
@ -204,18 +210,25 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O
|
||||
return
|
||||
}
|
||||
|
||||
vals = append(vals, v)
|
||||
i := len(vals)
|
||||
if len(vals) < maxVals {
|
||||
vals = append(vals, v)
|
||||
} else {
|
||||
i = (best + 1) % maxVals
|
||||
vals[i] = v
|
||||
}
|
||||
|
||||
if v.Val == nil {
|
||||
continue
|
||||
}
|
||||
// Select best value
|
||||
if best > -1 {
|
||||
i, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
|
||||
sel, err := dht.Validator.Select(key, [][]byte{vals[best].Val, v.Val})
|
||||
if err != nil {
|
||||
continue //TODO: Do we want to do something with the error here?
|
||||
log.Warning("Failed to select dht key: ", err)
|
||||
continue
|
||||
}
|
||||
if i != best && !bytes.Equal(v.Val, vals[best].Val) {
|
||||
if sel == 1 && !bytes.Equal(v.Val, vals[best].Val) {
|
||||
best = i
|
||||
out <- v.Val
|
||||
}
|
||||
@ -247,7 +260,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R
|
||||
out = append(out, val)
|
||||
}
|
||||
|
||||
return out, nil
|
||||
return out, ctx.Err()
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, error) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user