mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 14:22:13 +00:00
Merge pull request #464 from MichaelMure/fix-metrics
metrics: record message/request event even in case of error
This commit is contained in:
commit
c2631d9569
60
dht_net.go
60
dht_net.go
@ -82,8 +82,9 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
|
||||
for {
|
||||
var req pb.Message
|
||||
msgbytes, err := r.ReadMsg()
|
||||
msgLen := len(msgbytes)
|
||||
if err != nil {
|
||||
defer r.ReleaseMsg(msgbytes)
|
||||
r.ReleaseMsg(msgbytes)
|
||||
if err == io.EOF {
|
||||
return true
|
||||
}
|
||||
@ -92,21 +93,25 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
|
||||
if err.Error() != "stream reset" {
|
||||
logger.Debugf("error reading message: %#v", err)
|
||||
}
|
||||
stats.RecordWithTags(
|
||||
ctx,
|
||||
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
|
||||
metrics.ReceivedMessageErrors.M(1),
|
||||
)
|
||||
if msgLen > 0 {
|
||||
stats.RecordWithTags(ctx,
|
||||
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
|
||||
metrics.ReceivedMessages.M(1),
|
||||
metrics.ReceivedMessageErrors.M(1),
|
||||
metrics.ReceivedBytes.M(int64(msgLen)),
|
||||
)
|
||||
}
|
||||
return false
|
||||
}
|
||||
err = req.Unmarshal(msgbytes)
|
||||
r.ReleaseMsg(msgbytes)
|
||||
if err != nil {
|
||||
logger.Debugf("error unmarshalling message: %#v", err)
|
||||
stats.RecordWithTags(
|
||||
ctx,
|
||||
stats.RecordWithTags(ctx,
|
||||
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
|
||||
metrics.ReceivedMessages.M(1),
|
||||
metrics.ReceivedMessageErrors.M(1),
|
||||
metrics.ReceivedBytes.M(int64(msgLen)),
|
||||
)
|
||||
return false
|
||||
}
|
||||
@ -114,15 +119,13 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
|
||||
timer.Reset(dhtStreamIdleTimeout)
|
||||
|
||||
startTime := time.Now()
|
||||
ctx, _ := tag.New(
|
||||
ctx,
|
||||
ctx, _ := tag.New(ctx,
|
||||
tag.Upsert(metrics.KeyMessageType, req.GetType().String()),
|
||||
)
|
||||
|
||||
stats.Record(
|
||||
ctx,
|
||||
stats.Record(ctx,
|
||||
metrics.ReceivedMessages.M(1),
|
||||
metrics.ReceivedBytes.M(int64(req.Size())),
|
||||
metrics.ReceivedBytes.M(int64(msgLen)),
|
||||
)
|
||||
|
||||
handler := dht.handlerForMsgType(req.GetType())
|
||||
@ -166,7 +169,10 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
|
||||
|
||||
ms, err := dht.messageSenderForPeer(ctx, p)
|
||||
if err != nil {
|
||||
stats.Record(ctx, metrics.SentRequestErrors.M(1))
|
||||
stats.Record(ctx,
|
||||
metrics.SentRequests.M(1),
|
||||
metrics.SentRequestErrors.M(1),
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -174,20 +180,20 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
|
||||
|
||||
rpmes, err := ms.SendRequest(ctx, pmes)
|
||||
if err != nil {
|
||||
stats.Record(ctx, metrics.SentRequestErrors.M(1))
|
||||
stats.Record(ctx,
|
||||
metrics.SentRequests.M(1),
|
||||
metrics.SentRequestErrors.M(1),
|
||||
)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// update the peer (on valid msgs only)
|
||||
dht.updateFromMessage(ctx, p, rpmes)
|
||||
|
||||
stats.Record(
|
||||
ctx,
|
||||
stats.Record(ctx,
|
||||
metrics.SentRequests.M(1),
|
||||
metrics.SentBytes.M(int64(pmes.Size())),
|
||||
metrics.OutboundRequestLatency.M(
|
||||
float64(time.Since(start))/float64(time.Millisecond),
|
||||
),
|
||||
metrics.OutboundRequestLatency.M(float64(time.Since(start))/float64(time.Millisecond)),
|
||||
)
|
||||
dht.peerstore.RecordLatency(p, time.Since(start))
|
||||
logger.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
|
||||
@ -200,20 +206,26 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
|
||||
|
||||
ms, err := dht.messageSenderForPeer(ctx, p)
|
||||
if err != nil {
|
||||
stats.Record(ctx, metrics.SentMessageErrors.M(1))
|
||||
stats.Record(ctx,
|
||||
metrics.SentMessages.M(1),
|
||||
metrics.SentMessageErrors.M(1),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := ms.SendMessage(ctx, pmes); err != nil {
|
||||
stats.Record(ctx, metrics.SentMessageErrors.M(1))
|
||||
stats.Record(ctx,
|
||||
metrics.SentMessages.M(1),
|
||||
metrics.SentMessageErrors.M(1),
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
stats.Record(
|
||||
ctx,
|
||||
stats.Record(ctx,
|
||||
metrics.SentMessages.M(1),
|
||||
metrics.SentBytes.M(int64(pmes.Size())),
|
||||
)
|
||||
|
||||
logger.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
|
||||
return nil
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user