mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-07-31 03:52:03 +00:00
SearchValue: simplify error handling
This commit is contained in:
60
routing.go
60
routing.go
@@ -107,6 +107,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts
|
||||
type RecvdVal struct {
|
||||
Val []byte
|
||||
From peer.ID
|
||||
Err error
|
||||
}
|
||||
|
||||
// GetValue searches for the value corresponding to given Key.
|
||||
@@ -123,29 +124,27 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key string, opts ...ropts.Opti
|
||||
responses, errCh := dht.SearchValue(ctx, key, opts...)
|
||||
var best []byte
|
||||
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case r, ok := <-responses:
|
||||
if !ok {
|
||||
if errCh == nil {
|
||||
break loop
|
||||
}
|
||||
responses = nil
|
||||
continue
|
||||
break
|
||||
}
|
||||
best = r
|
||||
|
||||
case err, ok := <- errCh:
|
||||
best = r
|
||||
case err, ok := <-errCh:
|
||||
if !ok {
|
||||
if responses == nil {
|
||||
break loop
|
||||
}
|
||||
errCh = nil
|
||||
continue
|
||||
break
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if errCh == nil && responses == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if best == nil {
|
||||
@@ -172,7 +171,7 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O
|
||||
defer close(out)
|
||||
defer close(outErr)
|
||||
|
||||
valCh, errCh := dht.GetValues(ctx, key, responsesNeeded)
|
||||
valCh := dht.GetValues(ctx, key, responsesNeeded)
|
||||
vals := make([]RecvdVal, 0, responsesNeeded)
|
||||
best := -1
|
||||
|
||||
@@ -207,15 +206,15 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O
|
||||
select {
|
||||
case v, ok := <-valCh:
|
||||
if !ok {
|
||||
if errCh == nil {
|
||||
return
|
||||
}
|
||||
// failed to find a good record
|
||||
if best < 0 {
|
||||
outErr <- routing.ErrNotFound
|
||||
}
|
||||
valCh = nil
|
||||
continue
|
||||
return
|
||||
}
|
||||
if v.Err != nil {
|
||||
outErr <- v.Err
|
||||
return
|
||||
}
|
||||
|
||||
vals = append(vals, v)
|
||||
@@ -240,16 +239,6 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O
|
||||
out <- v.Val
|
||||
}
|
||||
}
|
||||
case err, ok := <-errCh:
|
||||
if !ok {
|
||||
if valCh == nil {
|
||||
return
|
||||
}
|
||||
errCh = nil
|
||||
continue
|
||||
}
|
||||
outErr <- err
|
||||
return
|
||||
case <-ctx.Done():
|
||||
outErr <- ctx.Err()
|
||||
return
|
||||
@@ -261,20 +250,22 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...ropts.O
|
||||
}
|
||||
|
||||
// GetValues gets nvals values corresponding to the given key.
|
||||
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (<-chan RecvdVal, <-chan error) {
|
||||
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) <-chan RecvdVal {
|
||||
eip := log.EventBegin(ctx, "GetValues")
|
||||
|
||||
vals := make(chan RecvdVal, nvals)
|
||||
// alloc 1 to have for sync errors
|
||||
vals := make(chan RecvdVal, 1)
|
||||
|
||||
done := func(err error) (<-chan RecvdVal, <-chan error) {
|
||||
done := func(err error) <-chan RecvdVal {
|
||||
defer close(vals)
|
||||
|
||||
eip.Append(loggableKey(key))
|
||||
if err != nil {
|
||||
eip.SetError(err)
|
||||
vals <- RecvdVal{Err: err}
|
||||
}
|
||||
eip.Done()
|
||||
return vals, wrapErr(err)
|
||||
return vals
|
||||
}
|
||||
|
||||
// If we have it local, don't bother doing an RPC!
|
||||
@@ -365,9 +356,7 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (<-cha
|
||||
return res, nil
|
||||
})
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(errCh)
|
||||
reqCtx, cancel := context.WithTimeout(ctx, time.Minute)
|
||||
defer cancel()
|
||||
|
||||
@@ -380,13 +369,10 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (<-cha
|
||||
if got > 0 && (err == routing.ErrNotFound || reqCtx.Err() == context.DeadlineExceeded) {
|
||||
err = nil
|
||||
}
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
done(err)
|
||||
}()
|
||||
|
||||
return vals, errCh
|
||||
return vals
|
||||
}
|
||||
|
||||
// Provider abstraction for indirect stores.
|
||||
|
Reference in New Issue
Block a user