Address review comments

Update msgio to latest version
Use max size in msgio readers
Fix error handling in reads
This commit is contained in:
Cole Brown 2019-06-18 08:52:46 -04:00
parent f0593aa9f9
commit 855b46d37e
3 changed files with 22 additions and 17 deletions

View File

@ -72,7 +72,7 @@ func (dht *IpfsDHT) handleNewStream(s network.Stream) {
// Returns true on orderly completion of writes (so we can Close the stream). // Returns true on orderly completion of writes (so we can Close the stream).
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool { func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
ctx := dht.ctx ctx := dht.ctx
r := msgio.NewVarintReader(s) r := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
mPeer := s.Conn().RemotePeer() mPeer := s.Conn().RemotePeer()
@ -83,22 +83,14 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
var req pb.Message var req pb.Message
msgbytes, err := r.ReadMsg() msgbytes, err := r.ReadMsg()
if err != nil { if err != nil {
logger.Debugf("error reading message: %#v", err) defer r.ReleaseMsg(msgbytes)
stats.RecordWithTags( if err == io.EOF {
ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessageErrors.M(1),
)
return false
}
switch err := req.Unmarshal(msgbytes); err {
case io.EOF:
return true return true
default: }
// This string test is necessary because there isn't a single stream reset error // This string test is necessary because there isn't a single stream reset error
// instance in use. // instance in use.
if err.Error() != "stream reset" { if err.Error() != "stream reset" {
logger.Debugf("error unmarshalling message: %#v", err) logger.Debugf("error reading message: %#v", err)
} }
stats.RecordWithTags( stats.RecordWithTags(
ctx, ctx,
@ -106,7 +98,17 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
metrics.ReceivedMessageErrors.M(1), metrics.ReceivedMessageErrors.M(1),
) )
return false return false
case nil: }
err = req.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
logger.Debugf("error unmarshalling message: %#v", err)
stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessageErrors.M(1),
)
return false
} }
timer.Reset(dhtStreamIdleTimeout) timer.Reset(dhtStreamIdleTimeout)
@ -302,7 +304,7 @@ func (ms *messageSender) prep(ctx context.Context) error {
return err return err
} }
ms.r = msgio.NewVarintReader(nstr) ms.r = msgio.NewVarintReaderSize(nstr, network.MessageSizeMax)
ms.s = nstr ms.s = nstr
return nil return nil
@ -405,6 +407,7 @@ func (ms *messageSender) ctxReadMsg(ctx context.Context, mes *pb.Message) error
errc := make(chan error, 1) errc := make(chan error, 1)
go func(r msgio.ReadCloser) { go func(r msgio.ReadCloser) {
bytes, err := r.ReadMsg() bytes, err := r.ReadMsg()
defer r.ReleaseMsg(bytes)
if err != nil { if err != nil {
errc <- err errc <- err
return return

2
go.mod
View File

@ -17,7 +17,7 @@ require (
github.com/libp2p/go-libp2p-routing v0.1.0 github.com/libp2p/go-libp2p-routing v0.1.0
github.com/libp2p/go-libp2p-swarm v0.1.0 github.com/libp2p/go-libp2p-swarm v0.1.0
github.com/libp2p/go-libp2p-testing v0.0.3 github.com/libp2p/go-libp2p-testing v0.0.3
github.com/libp2p/go-msgio v0.0.2 github.com/libp2p/go-msgio v0.0.4
github.com/mr-tron/base58 v1.1.2 github.com/mr-tron/base58 v1.1.2
github.com/multiformats/go-multiaddr v0.0.4 github.com/multiformats/go-multiaddr v0.0.4
github.com/multiformats/go-multiaddr-dns v0.0.2 github.com/multiformats/go-multiaddr-dns v0.0.2

2
go.sum
View File

@ -155,6 +155,8 @@ github.com/libp2p/go-mplex v0.1.0 h1:/nBTy5+1yRyY82YaO6HXQRnO5IAGsXTjEJaR3LdTPc0
github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU= github.com/libp2p/go-mplex v0.1.0/go.mod h1:SXgmdki2kwCUlCCbfGLEgHjC4pFqhTp0ZoV6aiKgxDU=
github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0= github.com/libp2p/go-msgio v0.0.2 h1:ivPvEKHxmVkTClHzg6RXTYHqaJQ0V9cDbq+6lKb3UV0=
github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= github.com/libp2p/go-msgio v0.0.2/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-msgio v0.0.4 h1:agEFehY3zWJFUHK6SEMR7UYmk2z6kC3oeCM7ybLhguA=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-nat v0.0.3 h1:l6fKV+p0Xa354EqQOQP+d8CivdLM4kl5GxC1hSc/UeI= github.com/libp2p/go-nat v0.0.3 h1:l6fKV+p0Xa354EqQOQP+d8CivdLM4kl5GxC1hSc/UeI=
github.com/libp2p/go-nat v0.0.3/go.mod h1:88nUEt0k0JD45Bk93NIwDqjlhiOwOoV36GchpcVc1yI= github.com/libp2p/go-nat v0.0.3/go.mod h1:88nUEt0k0JD45Bk93NIwDqjlhiOwOoV36GchpcVc1yI=
github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw= github.com/libp2p/go-reuseport v0.0.1 h1:7PhkfH73VXfPJYKQ6JwS5I/eVcoyYi9IMNGc6FWpFLw=