rewrite findpeer and other dht tweaks

This commit is contained in:
Jeromy 2014-10-24 18:32:28 -07:00
parent 15533c8fa3
commit d817b84a64
3 changed files with 63 additions and 82 deletions

7
dht.go
View File

@ -384,18 +384,11 @@ func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID,
return dht.sendRequest(ctx, p, pmes)
}
func (dht *IpfsDHT) printTables() {
for _, route := range dht.routingTables {
route.Print()
}
}
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key, level int) (*Message, error) {
pmes := newMessage(Message_GET_PROVIDERS, string(key), level)
return dht.sendRequest(ctx, p, pmes)
}
// TODO: Could be done async
func (dht *IpfsDHT) addProviders(key u.Key, peers []*Message_Peer) []peer.Peer {
var provArr []peer.Peer
for _, prov := range peers {

View File

@ -283,7 +283,13 @@ func TestProvidesAsync(t *testing.T) {
ctxT, _ := context.WithTimeout(ctx, time.Millisecond*300)
provs := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 5)
select {
case p := <-provs:
case p, ok := <-provs:
if !ok {
t.Fatal("Provider channel was closed...")
}
if p == nil {
t.Fatal("Got back nil provider!")
}
if !p.ID().Equal(dhts[3].self.ID()) {
t.Fatalf("got a provider, but not the right one. %s", p)
}

View File

@ -61,7 +61,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
log.Warning("Got no peers back from routing table!")
return nil, nil
return nil, kb.ErrLookupFailure
}
// setup the Query
@ -152,22 +152,32 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
return peerOut
}
//TODO: this function could also be done asynchronously
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
done := make(chan struct{})
for _, pbp := range peers {
go func(mp *Message_Peer) {
defer func() { done <- struct{}{} }()
// construct new peer
p, err := dht.ensureConnectedToPeer(mp)
if err != nil {
log.Error("%s", err)
return
}
if p == nil {
log.Error("Got nil peer from ensureConnectedToPeer")
return
}
// construct new peer
p, err := dht.ensureConnectedToPeer(pbp)
if err != nil {
continue
}
dht.providers.AddProvider(k, p)
if ps.AddIfSmallerThan(p, count) {
out <- p
} else if ps.Size() >= count {
return
}
dht.providers.AddProvider(k, p)
if ps.AddIfSmallerThan(p, count) {
out <- p
} else if ps.Size() >= count {
return
}
}(pbp)
}
for _ = range peers {
<-done
}
}
@ -182,92 +192,64 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
}
routeLevel := 0
p = dht.routingTables[routeLevel].NearestPeer(kb.ConvertPeerID(id))
if p == nil {
return nil, nil
}
if p.ID().Equal(id) {
return p, nil
closest := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if closest == nil || len(closest) == 0 {
return nil, kb.ErrLookupFailure
}
for routeLevel < len(dht.routingTables) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
plist := pmes.GetCloserPeers()
if plist == nil || len(plist) == 0 {
routeLevel++
continue
// Sanity...
for _, p := range closest {
if p.ID().Equal(id) {
log.Error("Found target peer in list of closest peers...")
return p, nil
}
found := plist[0]
nxtPeer, err := dht.ensureConnectedToPeer(found)
if err != nil {
routeLevel++
continue
}
if nxtPeer.ID().Equal(id) {
return nxtPeer, nil
}
p = nxtPeer
}
return nil, u.ErrNotFound
}
func (dht *IpfsDHT) findPeerMultiple(ctx context.Context, id peer.ID) (peer.Peer, error) {
// Check if were already connected to them
p, _ := dht.FindLocal(id)
if p != nil {
return p, nil
}
// get the peers we need to announce to
routeLevel := 0
peers := dht.routingTables[routeLevel].NearestPeers(kb.ConvertPeerID(id), AlphaValue)
if len(peers) == 0 {
return nil, nil
}
// setup query function
// setup the Query
query := newQuery(u.Key(id), dht.dialer, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id, routeLevel)
if err != nil {
log.Error("%s getPeer error: %v", dht.self, err)
return nil, err
}
plist := pmes.GetCloserPeers()
if len(plist) == 0 {
routeLevel++
}
nxtprs := make([]peer.Peer, len(plist))
for i, fp := range plist {
nxtp, err := dht.peerFromInfo(fp)
closer := pmes.GetCloserPeers()
var clpeers []peer.Peer
for _, pbp := range closer {
np, err := dht.getPeer(peer.ID(pbp.GetId()))
if err != nil {
log.Error("%s findPeer error: %v", dht.self, err)
log.Warning("Received invalid peer from query")
continue
}
if nxtp.ID().Equal(id) {
return &dhtQueryResult{peer: nxtp, success: true}, nil
ma, err := pbp.Address()
if err != nil {
log.Warning("Received peer with bad or missing address.")
continue
}
nxtprs[i] = nxtp
np.AddAddress(ma)
if pbp.GetId() == string(id) {
return &dhtQueryResult{
peer: np,
success: true,
}, nil
}
clpeers = append(clpeers, np)
}
return &dhtQueryResult{closerPeers: nxtprs}, nil
return &dhtQueryResult{closerPeers: clpeers}, nil
})
result, err := query.Run(ctx, peers)
// run it!
result, err := query.Run(ctx, closest)
if err != nil {
return nil, err
}
log.Debug("FindPeer %v %v", id, result.success)
if result.peer == nil {
return nil, u.ErrNotFound
}
return result.peer, nil
}