fix up FindProvidersAsync

This commit is contained in:
Jeromy 2014-10-11 10:43:54 -07:00 committed by Juan Batiz-Benet
parent 4429ee65e3
commit c4d9c231dd
3 changed files with 20 additions and 76 deletions

6
dht.go
View File

@ -368,8 +368,8 @@ func (dht *IpfsDHT) Update(p *peer.Peer) {
// after some deadline of inactivity.
}
// Find looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) Find(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *IpfsDHT) FindLocal(id peer.ID) (*peer.Peer, *kb.RoutingTable) {
for _, table := range dht.routingTables {
p := table.Find(id)
if p != nil {
@ -465,7 +465,7 @@ func (dht *IpfsDHT) peerFromInfo(pbp *Message_Peer) (*peer.Peer, error) {
p, _ := dht.peerstore.Get(id)
if p == nil {
p, _ = dht.Find(id)
p, _ = dht.FindLocal(id)
if p != nil {
panic("somehow peer not getting into peerstore")
}

View File

@ -227,13 +227,16 @@ func TestProvides(t *testing.T) {
time.Sleep(time.Millisecond * 60)
ctxT, _ := context.WithTimeout(context.Background(), time.Second)
provs, err := dhts[0].FindProviders(ctxT, u.Key("hello"))
if err != nil {
t.Fatal(err)
}
provchan := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 1)
if len(provs) != 1 {
t.Fatal("Didnt get back providers")
after := time.After(time.Second)
select {
case prov := <-provchan:
if prov == nil {
t.Fatal("Got back nil provider")
}
case <-after:
t.Fatal("Did not get a provider back.")
}
}

View File

@ -3,6 +3,7 @@ package dht
import (
"bytes"
"encoding/json"
"sync"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
@ -117,26 +118,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
return nil
}
// NB: not actually async. Used to keep the interface consistent while the
// actual async method, FindProvidersAsync2 is under construction
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan *peer.Peer {
ch := make(chan *peer.Peer)
providers, err := dht.FindProviders(ctx, key)
if err != nil {
close(ch)
return ch
}
go func() {
defer close(ch)
for _, p := range providers {
ch <- p
}
}()
return ch
}
// FIXME: there's a bug here!
func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count int) <-chan *peer.Peer {
peerOut := make(chan *peer.Peer, count)
go func() {
ps := newPeerSet()
@ -151,9 +133,12 @@ func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count in
}
}
wg := new(sync.WaitGroup)
peers := dht.routingTables[0].NearestPeers(kb.ConvertKey(key), AlphaValue)
for _, pp := range peers {
wg.Add(1)
go func(p *peer.Peer) {
defer wg.Done()
pmes, err := dht.findProvidersSingle(ctx, p, key, 0)
if err != nil {
log.Error("%s", err)
@ -162,7 +147,8 @@ func (dht *IpfsDHT) FindProvidersAsync2(ctx context.Context, key u.Key, count in
dht.addPeerListAsync(key, pmes.GetProviderPeers(), ps, count, peerOut)
}(pp)
}
wg.Wait()
close(peerOut)
}()
return peerOut
}
@ -186,61 +172,16 @@ func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet
}
}
// FindProviders searches for peers who can provide the value for given key.
func (dht *IpfsDHT) FindProviders(ctx context.Context, key u.Key) ([]*peer.Peer, error) {
// get closest peer
log.Debug("Find providers for: '%s'", key)
p := dht.routingTables[0].NearestPeer(kb.ConvertKey(key))
if p == nil {
log.Warning("Got no nearest peer for find providers: '%s'", key)
return nil, nil
}
for level := 0; level < len(dht.routingTables); {
// attempt retrieving providers
pmes, err := dht.findProvidersSingle(ctx, p, key, level)
if err != nil {
return nil, err
}
// handle providers
provs := pmes.GetProviderPeers()
if provs != nil {
log.Debug("Got providers back from findProviders call!")
return dht.addProviders(key, provs), nil
}
log.Debug("Didnt get providers, just closer peers.")
closer := pmes.GetCloserPeers()
if len(closer) == 0 {
level++
continue
}
np, err := dht.peerFromInfo(closer[0])
if err != nil {
log.Debug("no peerFromInfo")
level++
continue
}
p = np
}
return nil, u.ErrNotFound
}
// Find specific Peer
// FindPeer searches for a peer with given ID.
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error) {
// Check if were already connected to them
p, _ := dht.Find(id)
p, _ := dht.FindLocal(id)
if p != nil {
return p, nil
}
// @whyrusleeping why is this here? doesn't the dht.Find above cover it?
routeLevel := 0
p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
if p == nil {
@ -277,7 +218,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (*peer.Peer, error
func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (*peer.Peer, error) {
// Check if were already connected to them
p, _ := dht.Find(id)
p, _ := dht.FindLocal(id)
if p != nil {
return p, nil
}