diff --git a/dht_net.go b/dht_net.go index 0af33f3..b8bfe84 100644 --- a/dht_net.go +++ b/dht_net.go @@ -9,7 +9,6 @@ import ( "time" ggio "github.com/gogo/protobuf/io" - ctxio "github.com/jbenet/go-context/io" "github.com/libp2p/go-libp2p-kad-dht/metrics" pb "github.com/libp2p/go-libp2p-kad-dht/pb" inet "github.com/libp2p/go-libp2p-net" @@ -19,6 +18,7 @@ import ( ) var dhtReadMessageTimeout = time.Minute +var dhtStreamIdleTimeout = 10 * time.Minute var ErrReadTimeout = fmt.Errorf("timed out reading response") // The Protobuf writer performs multiple small writes when writing a message. @@ -67,12 +67,12 @@ func (dht *IpfsDHT) handleNewStream(s inet.Stream) { // Returns true on orderly completion of writes (so we can Close the stream). func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool { ctx := dht.ctx - - cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func - cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func - r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) mPeer := s.Conn().RemotePeer() + timer := time.AfterFunc(dhtStreamIdleTimeout, func() { s.Reset() }) + defer timer.Stop() + for { var req pb.Message switch err := r.ReadMsg(&req); err { @@ -93,6 +93,8 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool { case nil: } + timer.Reset(dhtStreamIdleTimeout) + startTime := time.Now() ctx, _ = tag.New( ctx, @@ -126,7 +128,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool { } // send out response msg - err = writeMsg(cw, resp) + err = writeMsg(s, resp) if err != nil { stats.Record(ctx, metrics.ReceivedMessageErrors.M(1)) logger.Debugf("error writing response: %v", err) diff --git a/go.mod b/go.mod index 42d18fc..b396a71 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,6 @@ require ( github.com/ipfs/go-ipfs-util v0.0.1 github.com/ipfs/go-log v0.0.1 github.com/ipfs/go-todocounter v0.0.1 - github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 github.com/jbenet/goprocess v0.1.3 github.com/libp2p/go-libp2p v0.0.30 github.com/libp2p/go-libp2p-crypto v0.0.2 diff --git a/go.sum b/go.sum index bb6f8d6..544b944 100644 --- a/go.sum +++ b/go.sum @@ -84,8 +84,6 @@ github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec h1:DQqZhhDvrTrEQ3Q github.com/jbenet/go-cienv v0.0.0-20150120210510-1bb1476777ec/go.mod h1:rGaEvXB4uRSZMmzKNLoXvTu1sfx+1kv/DojUlPrSZGs= github.com/jbenet/go-cienv v0.1.0 h1:Vc/s0QbQtoxX8MwwSLWWh+xNNZvM3Lw7NsTcHrvvhMc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= -github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOlocH6Fxy8MmwDt+yVQYULKfN0RoTN8A= -github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo= github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2 h1:vhC1OXXiT9R2pczegwz6moDvuRpggaroAXhPIseh57A= github.com/jbenet/go-temp-err-catcher v0.0.0-20150120210811-aac704a3f4f2/go.mod h1:8GXXJV31xl8whumTzdZsTt3RnUIiPqzkyf7mxToRCMs= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw=