From c29e3e29fde2af70d972f95691e84534f021b489 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 17 Jun 2016 11:01:35 -0700 Subject: [PATCH] implement some simple dht request read timeouts License: MIT Signed-off-by: Jeromy --- dht.go | 43 ++++++++++++++++++++++++++++++++++++++++--- dht_net.go | 9 +++++++++ 2 files changed, 49 insertions(+), 3 deletions(-) diff --git a/dht.go b/dht.go index 3e06e97..b2164ff 100644 --- a/dht.go +++ b/dht.go @@ -107,6 +107,16 @@ func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0) pmes.Record = rec rpmes, err := dht.sendRequest(ctx, p, pmes) + switch err { + case ErrReadTimeout: + log.Errorf("read timeout: %s %s", p, key) + fallthrough + default: + return err + case nil: + break + } + if err != nil { return err } @@ -164,7 +174,16 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, defer log.EventBegin(ctx, "getValueSingle", p, &key).Done() pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0) - return dht.sendRequest(ctx, p, pmes) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Errorf("read timeout: %s %s", p, key) + fallthrough + default: + return nil, err + } } // getLocal attempts to retrieve the value from the datastore @@ -238,14 +257,32 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) ( defer log.EventBegin(ctx, "findPeerSingle", p, id).Done() pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0) - return dht.sendRequest(ctx, p, pmes) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Errorf("read timeout: %s %s", p, id) + fallthrough + default: + return nil, err + } } func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key key.Key) (*pb.Message, error) { defer log.EventBegin(ctx, "findProvidersSingle", p, &key).Done() pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0) - return dht.sendRequest(ctx, p, pmes) + resp, err := dht.sendRequest(ctx, p, pmes) + switch err { + case nil: + return resp, nil + case ErrReadTimeout: + log.Errorf("read timeout: %s %s", p, key) + fallthrough + default: + return nil, err + } } // nearestPeersToQuery returns the routing tables closest peers. diff --git a/dht_net.go b/dht_net.go index bc5d02d..b864d2c 100644 --- a/dht_net.go +++ b/dht_net.go @@ -1,6 +1,7 @@ package dht import ( + "fmt" "sync" "time" @@ -12,6 +13,9 @@ import ( inet "gx/ipfs/QmdBpVuSYuTGDA8Kn66CbKvEThXqKUh2nTANZEhzSxqrmJ/go-libp2p/p2p/net" ) +var dhtReadMessageTimeout = time.Minute +var ErrReadTimeout = fmt.Errorf("timed out reading response") + // handleNewStream implements the inet.StreamHandler func (dht *IpfsDHT) handleNewStream(s inet.Stream) { go dht.handleNewMessage(s) @@ -232,10 +236,15 @@ func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error errc <- r.ReadMsg(mes) }(ms.r) + t := time.NewTimer(dhtReadMessageTimeout) + defer t.Stop() + select { case err := <-errc: return err case <-ctx.Done(): return ctx.Err() + case <-t.C: + return ErrReadTimeout } }