mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 06:42:13 +00:00
implement some simple dht request read timeouts
License: MIT Signed-off-by: Jeromy <why@ipfs.io>
This commit is contained in:
parent
3f05aa0579
commit
c29e3e29fd
43
dht.go
43
dht.go
@ -107,6 +107,16 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID,
|
|||||||
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
|
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
|
||||||
pmes.Record = rec
|
pmes.Record = rec
|
||||||
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
rpmes, err := dht.sendRequest(ctx, p, pmes)
|
||||||
|
switch err {
|
||||||
|
case ErrReadTimeout:
|
||||||
|
log.Errorf("read timeout: %s %s", p, key)
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
return err
|
||||||
|
case nil:
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -164,7 +174,16 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
|
|||||||
defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()
|
defer log.EventBegin(ctx, "getValueSingle", p, &key).Done()
|
||||||
|
|
||||||
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
|
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
|
||||||
return dht.sendRequest(ctx, p, pmes)
|
resp, err := dht.sendRequest(ctx, p, pmes)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
return resp, nil
|
||||||
|
case ErrReadTimeout:
|
||||||
|
log.Errorf("read timeout: %s %s", p, key)
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getLocal attempts to retrieve the value from the datastore
|
// getLocal attempts to retrieve the value from the datastore
|
||||||
@ -238,14 +257,32 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
|
|||||||
defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()
|
defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()
|
||||||
|
|
||||||
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
|
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
|
||||||
return dht.sendRequest(ctx, p, pmes)
|
resp, err := dht.sendRequest(ctx, p, pmes)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
return resp, nil
|
||||||
|
case ErrReadTimeout:
|
||||||
|
log.Errorf("read timeout: %s %s", p, id)
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.Key) (*pb.Message, error) {
|
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.Key) (*pb.Message, error) {
|
||||||
defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()
|
defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done()
|
||||||
|
|
||||||
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
|
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
|
||||||
return dht.sendRequest(ctx, p, pmes)
|
resp, err := dht.sendRequest(ctx, p, pmes)
|
||||||
|
switch err {
|
||||||
|
case nil:
|
||||||
|
return resp, nil
|
||||||
|
case ErrReadTimeout:
|
||||||
|
log.Errorf("read timeout: %s %s", p, key)
|
||||||
|
fallthrough
|
||||||
|
default:
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// nearestPeersToQuery returns the routing tables closest peers.
|
// nearestPeersToQuery returns the routing tables closest peers.
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package dht
|
package dht
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -12,6 +13,9 @@ import (
|
|||||||
inet "gx/ipfs/QmdBpVuSYuTGDA8Kn66CbKvEThXqKUh2nTANZEhzSxqrmJ/go-libp2p/p2p/net"
|
inet "gx/ipfs/QmdBpVuSYuTGDA8Kn66CbKvEThXqKUh2nTANZEhzSxqrmJ/go-libp2p/p2p/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var dhtReadMessageTimeout = time.Minute
|
||||||
|
var ErrReadTimeout = fmt.Errorf("timed out reading response")
|
||||||
|
|
||||||
// handleNewStream implements the inet.StreamHandler
|
// handleNewStream implements the inet.StreamHandler
|
||||||
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
|
func (dht *IpfsDHT) handleNewStream(s inet.Stream) {
|
||||||
go dht.handleNewMessage(s)
|
go dht.handleNewMessage(s)
|
||||||
@ -232,10 +236,15 @@ func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error
|
|||||||
errc <- r.ReadMsg(mes)
|
errc <- r.ReadMsg(mes)
|
||||||
}(ms.r)
|
}(ms.r)
|
||||||
|
|
||||||
|
t := time.NewTimer(dhtReadMessageTimeout)
|
||||||
|
defer t.Stop()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case err := <-errc:
|
case err := <-errc:
|
||||||
return err
|
return err
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
|
case <-t.C:
|
||||||
|
return ErrReadTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user