mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
Set errors on dht event logs
License: MIT Signed-off-by: ForrestWeston <forrest@protocol.ai>
This commit is contained in:
parent
70fc1084b6
commit
24c9006902
12
dht.go
12
dht.go
@ -200,7 +200,8 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
|
||||
"peer": p,
|
||||
}
|
||||
|
||||
defer log.EventBegin(ctx, "getValueSingle", meta).Done()
|
||||
eip := log.EventBegin(ctx, "getValueSingle", meta)
|
||||
defer eip.Done()
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_GET_VALUE, key, 0)
|
||||
resp, err := dht.sendRequest(ctx, p, pmes)
|
||||
@ -211,6 +212,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) (
|
||||
log.Warningf("read timeout: %s %s", p.Pretty(), key)
|
||||
fallthrough
|
||||
default:
|
||||
eip.SetError(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -283,7 +285,8 @@ func (dht *IpfsDHT) FindLocal(id peer.ID) pstore.PeerInfo {
|
||||
|
||||
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
|
||||
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
|
||||
defer log.EventBegin(ctx, "findPeerSingle", p, id).Done()
|
||||
eip := log.EventBegin(ctx, "findPeerSingle", p, id)
|
||||
defer eip.Done()
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
|
||||
resp, err := dht.sendRequest(ctx, p, pmes)
|
||||
@ -294,12 +297,14 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (
|
||||
log.Warningf("read timeout: %s %s", p.Pretty(), id)
|
||||
fallthrough
|
||||
default:
|
||||
eip.SetError(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid.Cid) (*pb.Message, error) {
|
||||
defer log.EventBegin(ctx, "findProvidersSingle", p, key).Done()
|
||||
eip := log.EventBegin(ctx, "findProvidersSingle", p, key)
|
||||
defer eip.Done()
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, key.KeyString(), 0)
|
||||
resp, err := dht.sendRequest(ctx, p, pmes)
|
||||
@ -310,6 +315,7 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key *cid
|
||||
log.Warningf("read timeout: %s %s", p.Pretty(), key)
|
||||
fallthrough
|
||||
default:
|
||||
eip.SetError(err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
31
handlers.go
31
handlers.go
@ -43,8 +43,14 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
|
||||
}
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
defer log.EventBegin(ctx, "handleGetValue", p).Done()
|
||||
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
|
||||
eip := log.EventBegin(ctx, "handleGetValue", p)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
eip.SetError(err)
|
||||
}
|
||||
eip.Done()
|
||||
}()
|
||||
log.Debugf("%s handleGetValue for key: %s", dht.self, pmes.GetKey())
|
||||
|
||||
// setup response
|
||||
@ -147,8 +153,15 @@ func (dht *IpfsDHT) checkLocalDatastore(k string) (*recpb.Record, error) {
|
||||
}
|
||||
|
||||
// Store a value in this peer local storage
|
||||
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
defer log.EventBegin(ctx, "handlePutValue", p).Done()
|
||||
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (_ *pb.Message, err error) {
|
||||
eip := log.EventBegin(ctx, "handlePutValue", p)
|
||||
defer func() {
|
||||
if err != nil {
|
||||
eip.SetError(err)
|
||||
}
|
||||
eip.Done()
|
||||
}()
|
||||
|
||||
dskey := convertToDsKey(pmes.GetKey())
|
||||
|
||||
rec := pmes.GetRecord()
|
||||
@ -157,7 +170,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
return nil, errors.New("nil record")
|
||||
}
|
||||
|
||||
if err := dht.verifyRecordLocally(rec); err != nil {
|
||||
if err = dht.verifyRecordLocally(rec); err != nil {
|
||||
log.Warningf("Bad dht record in PUT from: %s. %s", peer.ID(pmes.GetRecord().GetAuthor()), err)
|
||||
return nil, err
|
||||
}
|
||||
@ -214,11 +227,13 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
lm := make(lgbl.DeferredMap)
|
||||
lm["peer"] = func() interface{} { return p.Pretty() }
|
||||
defer log.EventBegin(ctx, "handleGetProviders", lm).Done()
|
||||
eip := log.EventBegin(ctx, "handleGetProviders", lm)
|
||||
defer eip.Done()
|
||||
|
||||
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
||||
c, err := cid.Cast([]byte(pmes.GetKey()))
|
||||
if err != nil {
|
||||
eip.SetError(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -263,10 +278,12 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
|
||||
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
lm := make(lgbl.DeferredMap)
|
||||
lm["peer"] = func() interface{} { return p.Pretty() }
|
||||
eip := log.EventBegin(ctx, "handleAddProvider", lm)
|
||||
defer eip.Done()
|
||||
|
||||
defer log.EventBegin(ctx, "handleAddProvider", lm).Done()
|
||||
c, err := cid.Cast([]byte(pmes.GetKey()))
|
||||
if err != nil {
|
||||
eip.SetError(err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user