mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
dht: update to use net.LocalPeer
This commit is contained in:
parent
e1b5933641
commit
3dd6ee1f86
59
dht.go
59
dht.go
@ -274,14 +274,11 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
|
||||
}
|
||||
|
||||
// Perhaps we were given closer peers
|
||||
var peers []peer.Peer
|
||||
for _, pb := range pmes.GetCloserPeers() {
|
||||
pr, err := dht.peerFromInfo(pb)
|
||||
peers, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetCloserPeers())
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
peers = append(peers, pr)
|
||||
}
|
||||
|
||||
if len(peers) > 0 {
|
||||
@ -426,22 +423,20 @@ func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.
|
||||
return dht.sendRequest(ctx, p, pmes)
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) addProviders(key u.Key, peers []*pb.Message_Peer) []peer.Peer {
|
||||
func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer {
|
||||
peers, errs := pb.PBPeersToPeers(dht.peerstore, pbps)
|
||||
for _, err := range errs {
|
||||
log.Errorf("error converting peer: %v", err)
|
||||
}
|
||||
|
||||
var provArr []peer.Peer
|
||||
for _, prov := range peers {
|
||||
p, err := dht.peerFromInfo(prov)
|
||||
if err != nil {
|
||||
log.Errorf("error getting peer from info: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
|
||||
|
||||
for _, p := range peers {
|
||||
// Dont add outselves to the list
|
||||
if p.ID().Equal(dht.self.ID()) {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("%s adding provider: %s for %s", dht.self, p, key)
|
||||
// TODO(jbenet) ensure providers is idempotent
|
||||
dht.providers.AddProvider(key, p)
|
||||
provArr = append(provArr, p)
|
||||
@ -500,40 +495,16 @@ func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// peerFromInfo returns a peer using info in the protobuf peer struct
|
||||
// to lookup or create a peer
|
||||
func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
|
||||
|
||||
id := peer.ID(pbp.GetId())
|
||||
|
||||
// bail out if it's ourselves
|
||||
//TODO(jbenet) not sure this should be an error _here_
|
||||
if id.Equal(dht.self.ID()) {
|
||||
return nil, errors.New("found self")
|
||||
}
|
||||
|
||||
p, err := dht.getPeer(id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// add addresses we've just discovered
|
||||
maddrs, err := pbp.Addresses()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, maddr := range maddrs {
|
||||
p.AddAddress(maddr)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
|
||||
p, err := dht.peerFromInfo(pbp)
|
||||
p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if dht.dialer.LocalPeer().ID().Equal(p.ID()) {
|
||||
return nil, errors.New("attempting to ensure connection to self")
|
||||
}
|
||||
|
||||
// dial connection
|
||||
err = dht.dialer.DialPeer(ctx, p)
|
||||
return p, err
|
||||
|
@ -2,6 +2,7 @@ package dht_pb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
@ -32,6 +33,24 @@ func peerToPBPeer(p peer.Peer) *Message_Peer {
|
||||
return pbp
|
||||
}
|
||||
|
||||
// PBPeerToPeer turns a *Message_Peer into its peer.Peer counterpart
|
||||
func PBPeerToPeer(ps peer.Peerstore, pbp *Message_Peer) (peer.Peer, error) {
|
||||
p, err := ps.FindOrCreate(peer.ID(pbp.GetId()))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to get peer from peerstore: %s", err)
|
||||
}
|
||||
|
||||
// add addresses
|
||||
maddrs, err := pbp.Addresses()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Received peer with bad or missing addresses: %s", pbp.Addrs)
|
||||
}
|
||||
for _, maddr := range maddrs {
|
||||
p.AddAddress(maddr)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// RawPeersToPBPeers converts a slice of Peers into a slice of *Message_Peers,
|
||||
// ready to go out on the wire.
|
||||
func RawPeersToPBPeers(peers []peer.Peer) []*Message_Peer {
|
||||
@ -55,6 +74,24 @@ func PeersToPBPeers(d inet.Dialer, peers []peer.Peer) []*Message_Peer {
|
||||
return pbps
|
||||
}
|
||||
|
||||
// PBPeersToPeers converts given []*Message_Peer into a set of []peer.Peer
|
||||
// Returns two slices, one of peers, and one of errors. The slice of peers
|
||||
// will ONLY contain successfully converted peers. The slice of errors contains
|
||||
// whether each input Message_Peer was successfully converted.
|
||||
func PBPeersToPeers(ps peer.Peerstore, pbps []*Message_Peer) ([]peer.Peer, []error) {
|
||||
errs := make([]error, len(pbps))
|
||||
peers := make([]peer.Peer, 0, len(pbps))
|
||||
for i, pbp := range pbps {
|
||||
p, err := PBPeerToPeer(ps, pbp)
|
||||
if err != nil {
|
||||
errs[i] = err
|
||||
} else {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
}
|
||||
return peers, errs
|
||||
}
|
||||
|
||||
// Addresses returns a multiaddr associated with the Message_Peer entry
|
||||
func (m *Message_Peer) Addresses() ([]ma.Multiaddr, error) {
|
||||
if m == nil {
|
||||
|
7
query.go
7
query.go
@ -161,7 +161,12 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) {
|
||||
return
|
||||
}
|
||||
|
||||
// if new peer further away than whom we got it from, bother (loops)
|
||||
// if new peer is ourselves...
|
||||
if next.ID().Equal(r.query.dialer.LocalPeer().ID()) {
|
||||
return
|
||||
}
|
||||
|
||||
// if new peer further away than whom we got it from, don't bother (loops)
|
||||
if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) {
|
||||
return
|
||||
}
|
||||
|
24
routing.go
24
routing.go
@ -234,31 +234,21 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
|
||||
}
|
||||
|
||||
closer := pmes.GetCloserPeers()
|
||||
var clpeers []peer.Peer
|
||||
for _, pbp := range closer {
|
||||
np, err := dht.getPeer(peer.ID(pbp.GetId()))
|
||||
clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer)
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
log.Warningf("Received invalid peer from query: %v", err)
|
||||
continue
|
||||
log.Warning(err)
|
||||
}
|
||||
}
|
||||
|
||||
// add addresses
|
||||
maddrs, err := pbp.Addresses()
|
||||
if err != nil {
|
||||
log.Warning("Received peer with bad or missing addresses: %s", pbp.Addrs)
|
||||
continue
|
||||
}
|
||||
for _, maddr := range maddrs {
|
||||
np.AddAddress(maddr)
|
||||
}
|
||||
|
||||
if pbp.GetId() == string(id) {
|
||||
// see it we got the peer here
|
||||
for _, np := range clpeers {
|
||||
if string(np.ID()) == string(id) {
|
||||
return &dhtQueryResult{
|
||||
peer: np,
|
||||
success: true,
|
||||
}, nil
|
||||
}
|
||||
clpeers = append(clpeers, np)
|
||||
}
|
||||
|
||||
return &dhtQueryResult{closerPeers: clpeers}, nil
|
||||
|
Loading…
x
Reference in New Issue
Block a user