fix Search/GetValue to be Kad compliant. Default quorum is now 0 which means do not abort the query early

This commit is contained in:
Adin Schmahmann
2020-01-21 09:53:07 -08:00
committed by Steven Allen
parent c4e94ce0fe
commit 68b116932f
5 changed files with 145 additions and 196 deletions

View File

@@ -171,106 +171,61 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing
responsesNeeded = getQuorum(&cfg, 0)
}
valCh := dht.getValues(ctx, key, responsesNeeded)
valCh, queries := dht.getValues(ctx, key, func() bool { return false })
out := make(chan []byte)
go func() {
defer close(out)
maxVals := responsesNeeded
if maxVals < 0 {
maxVals = defaultQuorum * 4 // we want some upper bound on how
// much correctional entries we will send
best, peersWithBest := dht.searchValueQuorum(ctx, key, valCh, out, responsesNeeded)
if best == nil {
return
}
// vals is used collect entries we got so far and send corrections to peers
// when we exit this function
vals := make([]RecvdVal, 0, maxVals)
var best *RecvdVal
defer func() {
if len(vals) <= 1 || best == nil {
updatePeers := make([]peer.ID, 0, dht.bucketSize)
select {
case q := <-queries:
if len(q) < 1 {
return
}
fixupRec := record.MakePutRecord(key, best.Val)
correctingEntries := make(map[peer.ID]RecvdVal)
correctingPeers := make([]peer.ID, 0, len(vals))
for _, v := range vals {
// if someone sent us a different 'less-valid' record, lets correct them
if !bytes.Equal(v.Val, best.Val) {
correctingEntries[v.From] = v
correctingPeers = append(correctingPeers, v.From)
peers := q[0].globallyQueriedPeers.Peers()
peers = kb.SortClosestPeers(peers, kb.ConvertKey(key))
for _, p := range peers {
if _, ok := peersWithBest[p]; !ok {
updatePeers = append(updatePeers, p)
}
}
// only correct the peers closest to the target
correctingPeers = kb.SortClosestPeers(correctingPeers, kb.ConvertKey(key))
if numCorrectingPeers := len(correctingPeers); maxVals > numCorrectingPeers {
maxVals = numCorrectingPeers
}
for _, p := range correctingPeers[:maxVals] {
go func(v RecvdVal) {
if v.From == dht.self {
err := dht.putLocal(key, fixupRec)
if err != nil {
logger.Error("Error correcting local dht entry:", err)
}
return
}
ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30)
defer cancel()
err := dht.putValueToPeer(ctx, v.From, fixupRec)
if err != nil {
logger.Debug("Error correcting DHT entry: ", err)
}
}(correctingEntries[p])
}
}()
for {
select {
case v, ok := <-valCh:
if !ok {
return
}
vals = append(vals, v)
if v.Val == nil {
continue
}
// Select best value
if best != nil {
if bytes.Equal(best.Val, v.Val) {
continue
}
sel, err := dht.Validator.Select(key, [][]byte{best.Val, v.Val})
if err != nil {
logger.Warning("Failed to select dht key: ", err)
continue
}
if sel != 1 {
continue
}
}
best = &v
select {
case out <- v.Val:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
dht.updatePeerValues(dht.Context(), key, best, updatePeers)
}()
return out, nil
}
func (dht *IpfsDHT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal,
out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}) {
numResponses := 0
return dht.processValues(ctx, key, valCh,
func(ctx context.Context, v RecvdVal, better bool) bool {
numResponses++
if better {
select {
case out <- v.Val:
case <-ctx.Done():
return false
}
}
if nvals > 0 && numResponses > nvals {
return true
}
return false
})
}
// GetValues gets nvals values corresponding to the given key.
func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []RecvdVal, err error) {
if !dht.enableValues {
@@ -281,92 +236,105 @@ func (dht *IpfsDHT) GetValues(ctx context.Context, key string, nvals int) (_ []R
eip.Append(loggableKey(key))
defer eip.Done()
valCh := dht.getValues(ctx, key, nvals)
queryCtx, cancel := context.WithCancel(ctx)
valCh, _ := dht.getValues(queryCtx, key, func() bool {
return false
})
out := make([]RecvdVal, 0, nvals)
for val := range valCh {
out = append(out, val)
if len(out) == nvals {
cancel()
}
}
return out, ctx.Err()
}
func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) <-chan RecvdVal {
valCh := make(chan RecvdVal, 1)
func (dht *IpfsDHT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}) {
aborted := false
valSignal := make(chan struct{}, 1)
valsSent := 0
valsSentMx := sync.RWMutex{}
vbuf := make([]RecvdVal, 0, 32)
vbufMx := sync.Mutex{}
valEmitCtx, cancelValEmit := context.WithCancel(ctx)
go func() {
defer close(valCh)
for {
var next RecvdVal
trySetNext := func() bool {
vbufMx.Lock()
notEmpty := len(vbuf) > 0
if notEmpty {
next = vbuf[0]
vbuf = vbuf[1:]
}
vbufMx.Unlock()
return notEmpty
}
if !trySetNext() {
signal:
for {
select {
case <-valSignal:
if trySetNext() {
break signal
}
case <-valEmitCtx.Done():
if !trySetNext() {
return
}
for {
select {
case valCh <- next:
valsSentMx.Lock()
valsSent++
valsSentMx.Unlock()
if !trySetNext() {
return
}
default:
return
}
}
}
}
}
send:
for {
select {
case valCh <- next:
valsSentMx.Lock()
valsSent++
valsSentMx.Unlock()
if !trySetNext() {
break send
}
case <-valEmitCtx.Done():
break send
}
}
loop:
for {
if aborted {
return best, nil
}
}()
select {
case v, ok := <-vals:
if !ok {
break loop
}
// Select best value
if best != nil {
if bytes.Equal(best, v.Val) {
peersWithBest[v.From] = struct{}{}
aborted = newVal(ctx, v, false)
continue
}
sel, err := dht.Validator.Select(key, [][]byte{best, v.Val})
if err != nil {
logger.Warning("Failed to select dht key: ", err)
continue
}
if sel != 1 {
aborted = newVal(ctx, v, false)
continue
}
}
peersWithBest = make(map[peer.ID]struct{})
peersWithBest[v.From] = struct{}{}
best = v.Val
aborted = newVal(ctx, v, true)
case <-ctx.Done():
return
}
}
if aborted {
return best, nil
}
return
}
func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte, peers []peer.ID) {
fixupRec := record.MakePutRecord(key, val)
for _, p := range peers {
go func(p peer.ID) {
//TODO: Is this possible?
if p == dht.self {
err := dht.putLocal(key, fixupRec)
if err != nil {
logger.Error("Error correcting local dht entry:", err)
}
return
}
ctx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
err := dht.putValueToPeer(ctx, p, fixupRec)
if err != nil {
logger.Debug("Error correcting DHT entry: ", err)
}
}(p)
}
}
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopFn func() bool) (<-chan RecvdVal, <-chan []*qu) {
valCh := make(chan RecvdVal, 1)
queriesCh := make(chan []*qu, 1)
if rec, err := dht.getLocal(key); rec != nil && err == nil {
select {
case valCh <- RecvdVal{
Val: rec.GetValue(),
From: dht.self,
}:
case <-ctx.Done():
}
}
go func() {
queries := dht.runDisjointQueries(ctx, dht.d, key,
@@ -402,12 +370,10 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) <-chan
From: p,
}
vbufMx.Lock()
vbuf = append(vbuf, rv)
vbufMx.Unlock()
select {
case valSignal <- struct{}{}:
default:
case valCh <- rv:
case <-ctx.Done():
return nil, ctx.Err()
}
}
@@ -421,16 +387,13 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) <-chan
return peers, err
},
func(peerset *kpeerset.SortedPeerset) bool {
if nvals <= 0 {
return false
}
valsSentMx.RLock()
defer valsSentMx.RUnlock()
return valsSent >= nvals
return stopFn()
},
)
cancelValEmit()
close(valCh)
queriesCh <- queries
close(queriesCh)
shortcutTaken := false
for _, q := range queries {
@@ -447,7 +410,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, nvals int) <-chan
}
}()
return valCh
return valCh, queriesCh
}
// Provider abstraction for indirect stores.
@@ -667,19 +630,6 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
return pi, nil
}
peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return peer.AddrInfo{}, kb.ErrLookupFailure
}
// Sanity...
for _, p := range peers {
if p == id {
logger.Debug("found target peer in list of closest peers...")
return dht.peerstore.PeerInfo(p), nil
}
}
queries := dht.runDisjointQueries(ctx, dht.d, string(id),
func(ctx context.Context, p peer.ID) ([]*peer.AddrInfo, error) {
// For DHT query command