mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
fix publish fail on prexisting bad record
dont error out if prexisting record is bad, just grab its sequence number and continue on with the publish. License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
parent
d8e0925111
commit
d4724ae8e6
104
handlers.go
104
handlers.go
@ -3,6 +3,7 @@ package dht
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
|
||||
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
@ -10,6 +11,7 @@ import (
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
lgbl "github.com/ipfs/go-ipfs/util/eventlog/loggables"
|
||||
)
|
||||
|
||||
@ -46,41 +48,17 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
||||
|
||||
// first, is there even a key?
|
||||
k := pmes.GetKey()
|
||||
k := key.Key(pmes.GetKey())
|
||||
if k == "" {
|
||||
return nil, errors.New("handleGetValue but no key was provided")
|
||||
// TODO: send back an error response? could be bad, but the other node's hanging.
|
||||
}
|
||||
|
||||
// let's first check if we have the value locally.
|
||||
log.Debugf("%s handleGetValue looking into ds", dht.self)
|
||||
dskey := key.Key(k).DsKey()
|
||||
iVal, err := dht.datastore.Get(dskey)
|
||||
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)
|
||||
|
||||
// if we got an unexpected error, bail.
|
||||
if err != nil && err != ds.ErrNotFound {
|
||||
rec, err := dht.checkLocalDatastore(k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if we have the value, send it back
|
||||
if err == nil {
|
||||
log.Debugf("%s handleGetValue success!", dht.self)
|
||||
|
||||
byts, ok := iVal.([]byte)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
|
||||
}
|
||||
|
||||
rec := new(pb.Record)
|
||||
err := proto.Unmarshal(byts, rec)
|
||||
if err != nil {
|
||||
log.Debug("Failed to unmarshal dht record from datastore")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp.Record = rec
|
||||
}
|
||||
resp.Record = rec
|
||||
|
||||
// Find closest peer on given cluster to desired key and reply with that info
|
||||
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
|
||||
@ -102,6 +80,69 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) checkLocalDatastore(k key.Key) (*pb.Record, error) {
|
||||
log.Debugf("%s handleGetValue looking into ds", dht.self)
|
||||
dskey := k.DsKey()
|
||||
iVal, err := dht.datastore.Get(dskey)
|
||||
log.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, iVal)
|
||||
|
||||
if err == ds.ErrNotFound {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// if we got an unexpected error, bail.
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if we have the value, send it back
|
||||
log.Debugf("%s handleGetValue success!", dht.self)
|
||||
|
||||
byts, ok := iVal.([]byte)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("datastore had non byte-slice value for %v", dskey)
|
||||
}
|
||||
|
||||
rec := new(pb.Record)
|
||||
err = proto.Unmarshal(byts, rec)
|
||||
if err != nil {
|
||||
log.Debug("Failed to unmarshal dht record from datastore")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// if its our record, dont bother checking the times on it
|
||||
if peer.ID(rec.GetAuthor()) == dht.self {
|
||||
return rec, nil
|
||||
}
|
||||
|
||||
var recordIsBad bool
|
||||
recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
|
||||
if err != nil {
|
||||
log.Info("either no receive time set on record, or it was invalid: ", err)
|
||||
recordIsBad = true
|
||||
}
|
||||
|
||||
if time.Now().Sub(recvtime) > MaxRecordAge {
|
||||
log.Debug("old record found, tossing.")
|
||||
recordIsBad = true
|
||||
}
|
||||
|
||||
// NOTE: we do not verify the record here beyond checking these timestamps.
|
||||
// we put the burden of checking the records on the requester as checking a record
|
||||
// may be computationally expensive
|
||||
|
||||
if recordIsBad {
|
||||
err := dht.datastore.Delete(dskey)
|
||||
if err != nil {
|
||||
log.Error("Failed to delete bad record from datastore: ", err)
|
||||
}
|
||||
|
||||
return nil, nil // can treat this as not having the record at all
|
||||
}
|
||||
|
||||
return rec, nil
|
||||
}
|
||||
|
||||
// Store a value in this peer local storage
|
||||
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
defer log.EventBegin(ctx, "handlePutValue", p).Done()
|
||||
@ -112,7 +153,12 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
|
||||
return nil, err
|
||||
}
|
||||
|
||||
data, err := proto.Marshal(pmes.GetRecord())
|
||||
rec := pmes.GetRecord()
|
||||
|
||||
// record the time we receive every record
|
||||
rec.TimeReceived = proto.String(u.FormatRFC3339(time.Now()))
|
||||
|
||||
data, err := proto.Marshal(rec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
15
pb/dht.pb.go
15
pb/dht.pb.go
@ -14,7 +14,7 @@ It has these top-level messages:
|
||||
*/
|
||||
package dht_pb
|
||||
|
||||
import proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
|
||||
import proto "github.com/gogo/protobuf/proto"
|
||||
import math "math"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
@ -221,8 +221,10 @@ type Record struct {
|
||||
// hash of the authors public key
|
||||
Author *string `protobuf:"bytes,3,opt,name=author" json:"author,omitempty"`
|
||||
// A PKI signature for the key+value+author
|
||||
Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
Signature []byte `protobuf:"bytes,4,opt,name=signature" json:"signature,omitempty"`
|
||||
// Time the record was received, set by receiver
|
||||
TimeReceived *string `protobuf:"bytes,5,opt,name=timeReceived" json:"timeReceived,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
}
|
||||
|
||||
func (m *Record) Reset() { *m = Record{} }
|
||||
@ -257,6 +259,13 @@ func (m *Record) GetSignature() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Record) GetTimeReceived() string {
|
||||
if m != nil && m.TimeReceived != nil {
|
||||
return *m.TimeReceived
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("dht.pb.Message_MessageType", Message_MessageType_name, Message_MessageType_value)
|
||||
proto.RegisterEnum("dht.pb.Message_ConnectionType", Message_ConnectionType_name, Message_ConnectionType_value)
|
||||
|
@ -75,4 +75,7 @@ message Record {
|
||||
|
||||
// A PKI signature for the key+value+author
|
||||
optional bytes signature = 4;
|
||||
|
||||
// Time the record was received, set by receiver
|
||||
optional string timeReceived = 5;
|
||||
}
|
||||
|
@ -2,6 +2,7 @@ package dht
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac"
|
||||
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
@ -12,6 +13,14 @@ import (
|
||||
record "github.com/ipfs/go-ipfs/routing/record"
|
||||
)
|
||||
|
||||
// MaxRecordAge specifies the maximum time that any node will hold onto a record
|
||||
// from the time its received. This does not apply to any other forms of validity that
|
||||
// the record may contain.
|
||||
// For example, a record may contain an ipns entry with an EOL saying its valid
|
||||
// until the year 2020 (a great time in the future). For that record to stick around
|
||||
// it must be rebroadcasted more frequently than once every 'MaxRecordAge'
|
||||
const MaxRecordAge = time.Hour * 36
|
||||
|
||||
func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
|
||||
log.Debugf("getPublicKey for: %s", p)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user