mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 14:52:14 +00:00
Replace delimited reader with msgio VarintReader
This commit is contained in:
parent
31765355df
commit
feee52a32b
17
dht_net.go
17
dht_net.go
@ -17,6 +17,7 @@ import (
|
|||||||
|
|
||||||
ggio "github.com/gogo/protobuf/io"
|
ggio "github.com/gogo/protobuf/io"
|
||||||
|
|
||||||
|
"github.com/libp2p/go-msgio"
|
||||||
"go.opencensus.io/stats"
|
"go.opencensus.io/stats"
|
||||||
"go.opencensus.io/tag"
|
"go.opencensus.io/tag"
|
||||||
)
|
)
|
||||||
@ -71,7 +72,7 @@ func (dht *IpfsDHT) handleNewStream(s network.Stream) {
|
|||||||
// Returns true on orderly completion of writes (so we can Close the stream).
|
// Returns true on orderly completion of writes (so we can Close the stream).
|
||||||
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
|
func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
|
||||||
ctx := dht.ctx
|
ctx := dht.ctx
|
||||||
r := ggio.NewDelimitedReader(s, network.MessageSizeMax)
|
r := msgio.NewVarintReader(s)
|
||||||
|
|
||||||
mPeer := s.Conn().RemotePeer()
|
mPeer := s.Conn().RemotePeer()
|
||||||
|
|
||||||
@ -80,14 +81,24 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
var req pb.Message
|
var req pb.Message
|
||||||
switch err := r.ReadMsg(&req); err {
|
msgbytes, err := r.ReadMsg()
|
||||||
|
if err != nil {
|
||||||
|
logger.Debugf("error reading message: %#v", err)
|
||||||
|
stats.RecordWithTags(
|
||||||
|
ctx,
|
||||||
|
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
|
||||||
|
metrics.ReceivedMessageErrors.M(1),
|
||||||
|
)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
switch err := req.Unmarshal(msgbytes); err {
|
||||||
case io.EOF:
|
case io.EOF:
|
||||||
return true
|
return true
|
||||||
default:
|
default:
|
||||||
// This string test is necessary because there isn't a single stream reset error
|
// This string test is necessary because there isn't a single stream reset error
|
||||||
// instance in use.
|
// instance in use.
|
||||||
if err.Error() != "stream reset" {
|
if err.Error() != "stream reset" {
|
||||||
logger.Debugf("error reading message: %#v", err)
|
logger.Debugf("error unmarshalling message: %#v", err)
|
||||||
}
|
}
|
||||||
stats.RecordWithTags(
|
stats.RecordWithTags(
|
||||||
ctx,
|
ctx,
|
||||||
|
1
go.mod
1
go.mod
@ -17,6 +17,7 @@ require (
|
|||||||
github.com/libp2p/go-libp2p-routing v0.1.0
|
github.com/libp2p/go-libp2p-routing v0.1.0
|
||||||
github.com/libp2p/go-libp2p-swarm v0.1.0
|
github.com/libp2p/go-libp2p-swarm v0.1.0
|
||||||
github.com/libp2p/go-libp2p-testing v0.0.3
|
github.com/libp2p/go-libp2p-testing v0.0.3
|
||||||
|
github.com/libp2p/go-msgio v0.0.2
|
||||||
github.com/mr-tron/base58 v1.1.2
|
github.com/mr-tron/base58 v1.1.2
|
||||||
github.com/multiformats/go-multiaddr v0.0.4
|
github.com/multiformats/go-multiaddr v0.0.4
|
||||||
github.com/multiformats/go-multiaddr-dns v0.0.2
|
github.com/multiformats/go-multiaddr-dns v0.0.2
|
||||||
|
Loading…
x
Reference in New Issue
Block a user