mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 06:42:13 +00:00
peer change: peer.Peer -> peer.ID
this is a major refactor of the entire codebase it changes the monolithic peer.Peer into using a peer.ID and a peer.Peerstore. Other changes: - removed handshake3. - testutil vastly simplified peer - secio bugfix + debugging logs - testutil: RandKeyPair - backpressure bugfix: w.o.w. - peer: added hex enc/dec - peer: added a PeerInfo struct PeerInfo is a small struct used to pass around a peer with a set of addresses and keys. This is not meant to be a complete view of the system, but rather to model updates to the peerstore. It is used by things like the routing system. - updated peer/queue + peerset - latency metrics - testutil: use crand for PeerID gen RandPeerID generates random "valid" peer IDs. it does not NEED to generate keys because it is as if we lost the key right away. fine to read some randomness and hash it. to generate proper keys and an ID, use: sk, pk, _ := testutil.RandKeyPair() id, _ := peer.IDFromPublicKey(pk) Also added RandPeerIDFatal helper - removed old spipe - updated seccat - core: cleanup initIdentity - removed old getFromPeerList
This commit is contained in:
parent
de0913f98f
commit
3b37c43171
188
dht.go
188
dht.go
@ -34,11 +34,10 @@ const doPinging = false
|
||||
// It is used to implement the base IpfsRouting module.
|
||||
type IpfsDHT struct {
|
||||
network inet.Network // the network services we need
|
||||
self peer.Peer // Local peer (yourself)
|
||||
peerstore peer.Peerstore // Other peers
|
||||
self peer.ID // Local peer (yourself)
|
||||
peerstore peer.Peerstore // Peer Registry
|
||||
|
||||
datastore ds.Datastore // Local data
|
||||
dslock sync.Mutex
|
||||
datastore ds.ThreadSafeDatastore // Local data
|
||||
|
||||
routingTable *kb.RoutingTable // Array of routing tables for differently distanced nodes
|
||||
providers *ProviderManager
|
||||
@ -53,19 +52,19 @@ type IpfsDHT struct {
|
||||
}
|
||||
|
||||
// NewDHT creates a new DHT object with the given peer as the 'local' host
|
||||
func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, n inet.Network, dstore ds.Datastore) *IpfsDHT {
|
||||
func NewDHT(ctx context.Context, p peer.ID, n inet.Network, dstore ds.ThreadSafeDatastore) *IpfsDHT {
|
||||
dht := new(IpfsDHT)
|
||||
dht.datastore = dstore
|
||||
dht.self = p
|
||||
dht.peerstore = ps
|
||||
dht.peerstore = n.Peerstore()
|
||||
dht.ContextGroup = ctxgroup.WithContext(ctx)
|
||||
dht.network = n
|
||||
n.SetHandler(inet.ProtocolDHT, dht.handleNewStream)
|
||||
|
||||
dht.providers = NewProviderManager(dht.Context(), p.ID())
|
||||
dht.providers = NewProviderManager(dht.Context(), p)
|
||||
dht.AddChildGroup(dht.providers)
|
||||
|
||||
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID()), time.Minute)
|
||||
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(p), time.Minute, dht.peerstore)
|
||||
dht.birth = time.Now()
|
||||
|
||||
dht.Validators = make(map[string]ValidatorFunc)
|
||||
@ -79,7 +78,7 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, n inet.Network,
|
||||
}
|
||||
|
||||
// Connect to a new peer at the given address, ping and add to the routing table
|
||||
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error {
|
||||
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
|
||||
if err := dht.network.DialPeer(ctx, npeer); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -95,7 +94,8 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error {
|
||||
}
|
||||
|
||||
// putValueToNetwork stores the given key/value pair at the peer 'p'
|
||||
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
|
||||
// meaning: it sends a PUT_VALUE message to p
|
||||
func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.ID,
|
||||
key string, rec *pb.Record) error {
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_PUT_VALUE, string(key), 0)
|
||||
@ -113,12 +113,13 @@ func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p peer.Peer,
|
||||
|
||||
// putProvider sends a message to peer 'p' saying that the local node
|
||||
// can provide the value of 'key'
|
||||
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) error {
|
||||
func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, key string) error {
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_ADD_PROVIDER, string(key), 0)
|
||||
|
||||
// add self as the provider
|
||||
pmes.ProviderPeers = pb.PeersToPBPeers(dht.network, []peer.Peer{dht.self})
|
||||
pi := dht.peerstore.PeerInfo(dht.self)
|
||||
pmes.ProviderPeers = pb.PeerInfosToPBPeers(dht.network, []peer.PeerInfo{pi})
|
||||
|
||||
err := dht.sendMessage(ctx, p, pmes)
|
||||
if err != nil {
|
||||
@ -130,8 +131,12 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.Peer, key string) er
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
|
||||
key u.Key) ([]byte, []peer.Peer, error) {
|
||||
// getValueOrPeers queries a particular peer p for the value for
|
||||
// key. It returns either the value or a list of closer peers.
|
||||
// NOTE: it will update the dht's peerstore with any new addresses
|
||||
// it finds for the given peer.
|
||||
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
|
||||
key u.Key) ([]byte, []peer.PeerInfo, error) {
|
||||
|
||||
pmes, err := dht.getValueSingle(ctx, p, key)
|
||||
if err != nil {
|
||||
@ -142,8 +147,8 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
|
||||
// Success! We were given the value
|
||||
log.Debug("getValueOrPeers: got value")
|
||||
|
||||
// make sure record is still valid
|
||||
err = dht.verifyRecord(record)
|
||||
// make sure record is valid.
|
||||
err = dht.verifyRecordOnline(ctx, record)
|
||||
if err != nil {
|
||||
log.Error("Received invalid record!")
|
||||
return nil, nil, err
|
||||
@ -151,24 +156,8 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
|
||||
return record.GetValue(), nil, nil
|
||||
}
|
||||
|
||||
// TODO decide on providers. This probably shouldn't be happening.
|
||||
if prv := pmes.GetProviderPeers(); prv != nil && len(prv) > 0 {
|
||||
val, err := dht.getFromPeerList(ctx, key, prv)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
log.Debug("getValueOrPeers: get from providers")
|
||||
return val, nil, nil
|
||||
}
|
||||
|
||||
// Perhaps we were given closer peers
|
||||
peers, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetCloserPeers())
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
}
|
||||
|
||||
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
|
||||
if len(peers) > 0 {
|
||||
log.Debug("getValueOrPeers: peers")
|
||||
return nil, peers, nil
|
||||
@ -179,51 +168,16 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.Peer,
|
||||
}
|
||||
|
||||
// getValueSingle simply performs the get value RPC with the given parameters
|
||||
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.Peer,
|
||||
func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
|
||||
key u.Key) (*pb.Message, error) {
|
||||
|
||||
pmes := pb.NewMessage(pb.Message_GET_VALUE, string(key), 0)
|
||||
return dht.sendRequest(ctx, p, pmes)
|
||||
}
|
||||
|
||||
// TODO: Im not certain on this implementation, we get a list of peers/providers
|
||||
// from someone what do we do with it? Connect to each of them? randomly pick
|
||||
// one to get the value from? Or just connect to one at a time until we get a
|
||||
// successful connection and request the value from it?
|
||||
func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
|
||||
peerlist []*pb.Message_Peer) ([]byte, error) {
|
||||
|
||||
for _, pinfo := range peerlist {
|
||||
p, err := dht.ensureConnectedToPeer(ctx, pinfo)
|
||||
if err != nil {
|
||||
log.Errorf("getFromPeers error: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
pmes, err := dht.getValueSingle(ctx, p, key)
|
||||
if err != nil {
|
||||
log.Errorf("getFromPeers error: %s\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if record := pmes.GetRecord(); record != nil {
|
||||
// Success! We were given the value
|
||||
|
||||
err := dht.verifyRecord(record)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dht.providers.AddProvider(key, p)
|
||||
return record.GetValue(), nil
|
||||
}
|
||||
}
|
||||
return nil, routing.ErrNotFound
|
||||
}
|
||||
|
||||
// getLocal attempts to retrieve the value from the datastore
|
||||
func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
|
||||
dht.dslock.Lock()
|
||||
defer dht.dslock.Unlock()
|
||||
|
||||
log.Debug("getLocal %s", key)
|
||||
v, err := dht.datastore.Get(key.DsKey())
|
||||
if err != nil {
|
||||
@ -243,7 +197,7 @@ func (dht *IpfsDHT) getLocal(key u.Key) ([]byte, error) {
|
||||
|
||||
// TODO: 'if paranoid'
|
||||
if u.Debug {
|
||||
err = dht.verifyRecord(rec)
|
||||
err = dht.verifyRecordLocally(rec)
|
||||
if err != nil {
|
||||
log.Errorf("local record verify failed: %s", err)
|
||||
return nil, err
|
||||
@ -269,41 +223,40 @@ func (dht *IpfsDHT) putLocal(key u.Key, value []byte) error {
|
||||
|
||||
// Update signals the routingTable to Update its last-seen status
|
||||
// on the given peer.
|
||||
func (dht *IpfsDHT) Update(ctx context.Context, p peer.Peer) {
|
||||
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) {
|
||||
log.Event(ctx, "updatePeer", p)
|
||||
dht.routingTable.Update(p)
|
||||
}
|
||||
|
||||
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
|
||||
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.Peer, *kb.RoutingTable) {
|
||||
func (dht *IpfsDHT) FindLocal(id peer.ID) (peer.PeerInfo, *kb.RoutingTable) {
|
||||
p := dht.routingTable.Find(id)
|
||||
if p != nil {
|
||||
return p, dht.routingTable
|
||||
if p != "" {
|
||||
return dht.peerstore.PeerInfo(p), dht.routingTable
|
||||
}
|
||||
return nil, nil
|
||||
return peer.PeerInfo{}, nil
|
||||
}
|
||||
|
||||
// findPeerSingle asks peer 'p' if they know where the peer with id 'id' is
|
||||
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.Peer, id peer.ID) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) findPeerSingle(ctx context.Context, p peer.ID, id peer.ID) (*pb.Message, error) {
|
||||
pmes := pb.NewMessage(pb.Message_FIND_NODE, string(id), 0)
|
||||
return dht.sendRequest(ctx, p, pmes)
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.Peer, key u.Key) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) findProvidersSingle(ctx context.Context, p peer.ID, key u.Key) (*pb.Message, error) {
|
||||
pmes := pb.NewMessage(pb.Message_GET_PROVIDERS, string(key), 0)
|
||||
return dht.sendRequest(ctx, p, pmes)
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer {
|
||||
peers, errs := pb.PBPeersToPeers(dht.peerstore, pbps)
|
||||
for _, err := range errs {
|
||||
log.Errorf("error converting peer: %v", err)
|
||||
}
|
||||
func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.ID {
|
||||
peers := pb.PBPeersToPeerInfos(pbps)
|
||||
|
||||
var provArr []peer.ID
|
||||
for _, pi := range peers {
|
||||
p := pi.ID
|
||||
|
||||
var provArr []peer.Peer
|
||||
for _, p := range peers {
|
||||
// Dont add outselves to the list
|
||||
if p.ID().Equal(dht.self.ID()) {
|
||||
if p == dht.self {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -316,14 +269,14 @@ func (dht *IpfsDHT) addProviders(key u.Key, pbps []*pb.Message_Peer) []peer.Peer
|
||||
}
|
||||
|
||||
// nearestPeersToQuery returns the routing tables closest peers.
|
||||
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
|
||||
func (dht *IpfsDHT) nearestPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
key := u.Key(pmes.GetKey())
|
||||
closer := dht.routingTable.NearestPeers(kb.ConvertKey(key), count)
|
||||
return closer
|
||||
}
|
||||
|
||||
// betterPeerToQuery returns nearestPeersToQuery, but iff closer than self.
|
||||
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer {
|
||||
func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.ID {
|
||||
closer := dht.nearestPeersToQuery(pmes, count)
|
||||
|
||||
// no node? nil
|
||||
@ -333,17 +286,17 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer
|
||||
|
||||
// == to self? thats bad
|
||||
for _, p := range closer {
|
||||
if p.ID().Equal(dht.self.ID()) {
|
||||
if p == dht.self {
|
||||
log.Error("Attempted to return self! this shouldnt happen...")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var filtered []peer.Peer
|
||||
var filtered []peer.ID
|
||||
for _, p := range closer {
|
||||
// must all be closer than self
|
||||
key := u.Key(pmes.GetKey())
|
||||
if !kb.Closer(dht.self.ID(), p.ID(), key) {
|
||||
if !kb.Closer(dht.self, p, key) {
|
||||
filtered = append(filtered, p)
|
||||
}
|
||||
}
|
||||
@ -352,30 +305,13 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, count int) []peer.Peer
|
||||
return filtered
|
||||
}
|
||||
|
||||
// getPeer searches the peerstore for a peer with the given peer ID
|
||||
func (dht *IpfsDHT) getPeer(id peer.ID) (peer.Peer, error) {
|
||||
p, err := dht.peerstore.FindOrCreate(id)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Failed to get peer from peerstore: %s", err)
|
||||
log.Error(err)
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
|
||||
p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if dht.self.ID().Equal(p.ID()) {
|
||||
return nil, errors.New("attempting to ensure connection to self")
|
||||
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error {
|
||||
if p == dht.self {
|
||||
return errors.New("attempting to ensure connection to self")
|
||||
}
|
||||
|
||||
// dial connection
|
||||
err = dht.network.DialPeer(ctx, p)
|
||||
return p, err
|
||||
return dht.network.DialPeer(ctx, p)
|
||||
}
|
||||
|
||||
//TODO: this should be smarter about which keys it selects.
|
||||
@ -421,14 +357,24 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
||||
|
||||
// Bootstrap builds up list of peers by requesting random peer IDs
|
||||
func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
|
||||
id := make([]byte, 16)
|
||||
rand.Read(id)
|
||||
p, err := dht.FindPeer(ctx, peer.ID(id))
|
||||
if err != nil {
|
||||
log.Errorf("Bootstrap peer error: %s", err)
|
||||
}
|
||||
err = dht.network.DialPeer(ctx, p)
|
||||
if err != nil {
|
||||
log.Errorf("Bootstrap peer error: %s", err)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
id := make([]byte, 16)
|
||||
rand.Read(id)
|
||||
pi, err := dht.FindPeer(ctx, peer.ID(id))
|
||||
if err != nil {
|
||||
// NOTE: this is not an error. this is expected!
|
||||
log.Errorf("Bootstrap peer error: %s", err)
|
||||
}
|
||||
|
||||
// woah, we got a peer under a random id? it _cannot_ be valid.
|
||||
log.Errorf("dht seemingly found a peer at a random bootstrap id (%s)...", pi)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
|
||||
|
||||
// sendRequest sends out a request, but also makes sure to
|
||||
// measure the RTT for latency measurements.
|
||||
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
|
||||
log.Debugf("%s dht starting stream", dht.self)
|
||||
s, err := dht.network.NewStream(inet.ProtocolDHT, p)
|
||||
@ -98,13 +98,13 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Messa
|
||||
return nil, errors.New("no response to request")
|
||||
}
|
||||
|
||||
p.SetLatency(time.Since(start))
|
||||
dht.peerstore.RecordLatency(p, time.Since(start))
|
||||
log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
|
||||
return rpmes, nil
|
||||
}
|
||||
|
||||
// sendMessage sends out a message
|
||||
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.Peer, pmes *pb.Message) error {
|
||||
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
|
||||
|
||||
log.Debugf("%s dht starting stream", dht.self)
|
||||
s, err := dht.network.NewStream(inet.ProtocolDHT, p)
|
||||
|
299
dht_test.go
299
dht_test.go
@ -2,44 +2,47 @@ package dht
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
ci "github.com/jbenet/go-ipfs/crypto"
|
||||
// ci "github.com/jbenet/go-ipfs/crypto"
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func randMultiaddr(t *testing.T) ma.Multiaddr {
|
||||
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT {
|
||||
|
||||
s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000+rand.Intn(40000))
|
||||
a, err := ma.NewMultiaddr(s)
|
||||
sk, pk, err := testutil.RandKeyPair(512)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
p, err := peer.IDFromPublicKey(pk)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
|
||||
peerstore := peer.NewPeerstore()
|
||||
peerstore.AddPrivKey(p, sk)
|
||||
peerstore.AddPubKey(p, pk)
|
||||
peerstore.AddAddress(p, addr)
|
||||
|
||||
n, err := inet.NewNetwork(ctx, p.Addresses(), p, peerstore)
|
||||
n, err := inet.NewNetwork(ctx, []ma.Multiaddr{addr}, p, peerstore)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
d := NewDHT(ctx, p, peerstore, n, ds.NewMapDatastore())
|
||||
dss := dssync.MutexWrap(ds.NewMapDatastore())
|
||||
d := NewDHT(ctx, p, n, dss)
|
||||
|
||||
d.Validators["v"] = func(u.Key, []byte) error {
|
||||
return nil
|
||||
@ -47,77 +50,53 @@ func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT {
|
||||
return d
|
||||
}
|
||||
|
||||
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.Peer, []*IpfsDHT) {
|
||||
var addrs []ma.Multiaddr
|
||||
for i := 0; i < n; i++ {
|
||||
r := rand.Intn(40000)
|
||||
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000+r))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
addrs = append(addrs, a)
|
||||
}
|
||||
|
||||
var peers []peer.Peer
|
||||
for i := 0; i < n; i++ {
|
||||
p := makePeer(addrs[i])
|
||||
peers = append(peers, p)
|
||||
}
|
||||
|
||||
func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.ID, []*IpfsDHT) {
|
||||
addrs := make([]ma.Multiaddr, n)
|
||||
dhts := make([]*IpfsDHT, n)
|
||||
peers := make([]peer.ID, n)
|
||||
|
||||
for i := 0; i < n; i++ {
|
||||
dhts[i] = setupDHT(ctx, t, peers[i])
|
||||
addrs[i] = testutil.RandLocalTCPAddress()
|
||||
dhts[i] = setupDHT(ctx, t, addrs[i])
|
||||
peers[i] = dhts[i].self
|
||||
}
|
||||
|
||||
return addrs, peers, dhts
|
||||
}
|
||||
|
||||
func makePeerString(t *testing.T, addr string) peer.Peer {
|
||||
maddr, err := ma.NewMultiaddr(addr)
|
||||
if err != nil {
|
||||
func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
|
||||
|
||||
idB := b.self
|
||||
addrB := b.peerstore.Addresses(idB)
|
||||
if len(addrB) == 0 {
|
||||
t.Fatal("peers setup incorrectly: no local address")
|
||||
}
|
||||
|
||||
a.peerstore.AddAddresses(idB, addrB)
|
||||
if err := a.Connect(ctx, idB); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return makePeer(maddr)
|
||||
}
|
||||
|
||||
func makePeer(addr ma.Multiaddr) peer.Peer {
|
||||
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 512)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p, err := testutil.NewPeerWithKeyPair(sk, pk)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.AddAddress(addr)
|
||||
return p
|
||||
}
|
||||
|
||||
func TestPing(t *testing.T) {
|
||||
// t.Skip("skipping test to debug another")
|
||||
ctx := context.Background()
|
||||
|
||||
addrA := randMultiaddr(t)
|
||||
addrB := randMultiaddr(t)
|
||||
addrA := testutil.RandLocalTCPAddress()
|
||||
addrB := testutil.RandLocalTCPAddress()
|
||||
|
||||
peerA := makePeer(addrA)
|
||||
peerB := makePeer(addrB)
|
||||
dhtA := setupDHT(ctx, t, addrA)
|
||||
dhtB := setupDHT(ctx, t, addrB)
|
||||
|
||||
dhtA := setupDHT(ctx, t, peerA)
|
||||
dhtB := setupDHT(ctx, t, peerB)
|
||||
peerA := dhtA.self
|
||||
peerB := dhtB.self
|
||||
|
||||
defer dhtA.Close()
|
||||
defer dhtB.Close()
|
||||
defer dhtA.network.Close()
|
||||
defer dhtB.network.Close()
|
||||
|
||||
if err := dhtA.Connect(ctx, peerB); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// if err := dhtB.Connect(ctx, peerA); err != nil {
|
||||
// t.Fatal(err)
|
||||
// }
|
||||
connect(t, ctx, dhtA, dhtB)
|
||||
|
||||
//Test that we can ping the node
|
||||
ctxT, _ := context.WithTimeout(ctx, 100*time.Millisecond)
|
||||
@ -136,14 +115,16 @@ func TestValueGetSet(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
addrA := randMultiaddr(t)
|
||||
addrB := randMultiaddr(t)
|
||||
addrA := testutil.RandLocalTCPAddress()
|
||||
addrB := testutil.RandLocalTCPAddress()
|
||||
|
||||
peerA := makePeer(addrA)
|
||||
peerB := makePeer(addrB)
|
||||
dhtA := setupDHT(ctx, t, addrA)
|
||||
dhtB := setupDHT(ctx, t, addrB)
|
||||
|
||||
dhtA := setupDHT(ctx, t, peerA)
|
||||
dhtB := setupDHT(ctx, t, peerB)
|
||||
defer dhtA.Close()
|
||||
defer dhtB.Close()
|
||||
defer dhtA.network.Close()
|
||||
defer dhtB.network.Close()
|
||||
|
||||
vf := func(u.Key, []byte) error {
|
||||
return nil
|
||||
@ -151,15 +132,7 @@ func TestValueGetSet(t *testing.T) {
|
||||
dhtA.Validators["v"] = vf
|
||||
dhtB.Validators["v"] = vf
|
||||
|
||||
defer dhtA.Close()
|
||||
defer dhtB.Close()
|
||||
defer dhtA.network.Close()
|
||||
defer dhtB.network.Close()
|
||||
|
||||
err := dhtA.Connect(ctx, peerB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
connect(t, ctx, dhtA, dhtB)
|
||||
|
||||
ctxT, _ := context.WithTimeout(ctx, time.Second)
|
||||
dhtA.PutValue(ctxT, "/v/hello", []byte("world"))
|
||||
@ -189,7 +162,7 @@ func TestProvides(t *testing.T) {
|
||||
// t.Skip("skipping test to debug another")
|
||||
ctx := context.Background()
|
||||
|
||||
_, peers, dhts := setupDHTS(ctx, 4, t)
|
||||
_, _, dhts := setupDHTS(ctx, 4, t)
|
||||
defer func() {
|
||||
for i := 0; i < 4; i++ {
|
||||
dhts[i].Close()
|
||||
@ -197,22 +170,11 @@ func TestProvides(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
err := dhts[0].Connect(ctx, peers[1])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
connect(t, ctx, dhts[0], dhts[1])
|
||||
connect(t, ctx, dhts[1], dhts[2])
|
||||
connect(t, ctx, dhts[1], dhts[3])
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[2])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[3])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[3].putLocal(u.Key("hello"), []byte("world"))
|
||||
err := dhts[3].putLocal(u.Key("hello"), []byte("world"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -227,18 +189,21 @@ func TestProvides(t *testing.T) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * 60)
|
||||
// what is this timeout for? was 60ms before.
|
||||
time.Sleep(time.Millisecond * 6)
|
||||
|
||||
ctxT, _ := context.WithTimeout(ctx, time.Second)
|
||||
provchan := dhts[0].FindProvidersAsync(ctxT, u.Key("hello"), 1)
|
||||
|
||||
after := time.After(time.Second)
|
||||
select {
|
||||
case prov := <-provchan:
|
||||
if prov == nil {
|
||||
if prov.ID == "" {
|
||||
t.Fatal("Got back nil provider")
|
||||
}
|
||||
case <-after:
|
||||
if prov.ID != dhts[3].self {
|
||||
t.Fatal("Got back nil provider")
|
||||
}
|
||||
case <-ctxT.Done():
|
||||
t.Fatal("Did not get a provider back.")
|
||||
}
|
||||
}
|
||||
@ -250,7 +215,7 @@ func TestProvidesAsync(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
_, peers, dhts := setupDHTS(ctx, 4, t)
|
||||
_, _, dhts := setupDHTS(ctx, 4, t)
|
||||
defer func() {
|
||||
for i := 0; i < 4; i++ {
|
||||
dhts[i].Close()
|
||||
@ -258,22 +223,11 @@ func TestProvidesAsync(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
err := dhts[0].Connect(ctx, peers[1])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
connect(t, ctx, dhts[0], dhts[1])
|
||||
connect(t, ctx, dhts[1], dhts[2])
|
||||
connect(t, ctx, dhts[1], dhts[3])
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[2])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[3])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[3].putLocal(u.Key("hello"), []byte("world"))
|
||||
err := dhts[3].putLocal(u.Key("hello"), []byte("world"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -297,10 +251,10 @@ func TestProvidesAsync(t *testing.T) {
|
||||
if !ok {
|
||||
t.Fatal("Provider channel was closed...")
|
||||
}
|
||||
if p == nil {
|
||||
if p.ID == "" {
|
||||
t.Fatal("Got back nil provider!")
|
||||
}
|
||||
if !p.ID().Equal(dhts[3].self.ID()) {
|
||||
if p.ID != dhts[3].self {
|
||||
t.Fatalf("got a provider, but not the right one. %s", p)
|
||||
}
|
||||
case <-ctxT.Done():
|
||||
@ -315,7 +269,7 @@ func TestLayeredGet(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
_, peers, dhts := setupDHTS(ctx, 4, t)
|
||||
_, _, dhts := setupDHTS(ctx, 4, t)
|
||||
defer func() {
|
||||
for i := 0; i < 4; i++ {
|
||||
dhts[i].Close()
|
||||
@ -323,22 +277,11 @@ func TestLayeredGet(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
err := dhts[0].Connect(ctx, peers[1])
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to connect: %s", err)
|
||||
}
|
||||
connect(t, ctx, dhts[0], dhts[1])
|
||||
connect(t, ctx, dhts[1], dhts[2])
|
||||
connect(t, ctx, dhts[1], dhts[3])
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[2])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[3])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[3].putLocal(u.Key("/v/hello"), []byte("world"))
|
||||
err := dhts[3].putLocal(u.Key("/v/hello"), []byte("world"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -377,32 +320,21 @@ func TestFindPeer(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
err := dhts[0].Connect(ctx, peers[1])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[2])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[3])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
connect(t, ctx, dhts[0], dhts[1])
|
||||
connect(t, ctx, dhts[1], dhts[2])
|
||||
connect(t, ctx, dhts[1], dhts[3])
|
||||
|
||||
ctxT, _ := context.WithTimeout(ctx, time.Second)
|
||||
p, err := dhts[0].FindPeer(ctxT, peers[2].ID())
|
||||
p, err := dhts[0].FindPeer(ctxT, peers[2])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if p == nil {
|
||||
if p.ID == "" {
|
||||
t.Fatal("Failed to find peer.")
|
||||
}
|
||||
|
||||
if !p.ID().Equal(peers[2].ID()) {
|
||||
if p.ID != peers[2] {
|
||||
t.Fatal("Didnt find expected peer.")
|
||||
}
|
||||
}
|
||||
@ -426,25 +358,10 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
|
||||
|
||||
// topology:
|
||||
// 0-1, 1-2, 1-3, 2-3
|
||||
err := dhts[0].Connect(ctx, peers[1])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[2])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[1].Connect(ctx, peers[3])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = dhts[2].Connect(ctx, peers[3])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
connect(t, ctx, dhts[0], dhts[1])
|
||||
connect(t, ctx, dhts[1], dhts[2])
|
||||
connect(t, ctx, dhts[1], dhts[3])
|
||||
connect(t, ctx, dhts[2], dhts[3])
|
||||
|
||||
// fmt.Println("0 is", peers[0])
|
||||
// fmt.Println("1 is", peers[1])
|
||||
@ -452,13 +369,13 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
|
||||
// fmt.Println("3 is", peers[3])
|
||||
|
||||
ctxT, _ := context.WithTimeout(ctx, time.Second)
|
||||
pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2].ID())
|
||||
pchan, err := dhts[0].FindPeersConnectedToPeer(ctxT, peers[2])
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// shouldFind := []peer.Peer{peers[1], peers[3]}
|
||||
found := []peer.Peer{}
|
||||
// shouldFind := []peer.ID{peers[1], peers[3]}
|
||||
found := []peer.PeerInfo{}
|
||||
for nextp := range pchan {
|
||||
found = append(found, nextp)
|
||||
}
|
||||
@ -475,7 +392,7 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func testPeerListsMatch(t *testing.T, p1, p2 []peer.Peer) {
|
||||
func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) {
|
||||
|
||||
if len(p1) != len(p2) {
|
||||
t.Fatal("did not find as many peers as should have", p1, p2)
|
||||
@ -485,11 +402,11 @@ func testPeerListsMatch(t *testing.T, p1, p2 []peer.Peer) {
|
||||
ids2 := make([]string, len(p2))
|
||||
|
||||
for i, p := range p1 {
|
||||
ids1[i] = p.ID().Pretty()
|
||||
ids1[i] = string(p)
|
||||
}
|
||||
|
||||
for i, p := range p2 {
|
||||
ids2[i] = p.ID().Pretty()
|
||||
ids2[i] = string(p)
|
||||
}
|
||||
|
||||
sort.Sort(sort.StringSlice(ids1))
|
||||
@ -514,39 +431,41 @@ func TestConnectCollision(t *testing.T) {
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
addrA := randMultiaddr(t)
|
||||
addrB := randMultiaddr(t)
|
||||
addrA := testutil.RandLocalTCPAddress()
|
||||
addrB := testutil.RandLocalTCPAddress()
|
||||
|
||||
peerA := makePeer(addrA)
|
||||
peerB := makePeer(addrB)
|
||||
dhtA := setupDHT(ctx, t, addrA)
|
||||
dhtB := setupDHT(ctx, t, addrB)
|
||||
|
||||
dhtA := setupDHT(ctx, t, peerA)
|
||||
dhtB := setupDHT(ctx, t, peerB)
|
||||
peerA := dhtA.self
|
||||
peerB := dhtB.self
|
||||
|
||||
done := make(chan struct{})
|
||||
errs := make(chan error)
|
||||
go func() {
|
||||
dhtA.peerstore.AddAddress(peerB, addrB)
|
||||
err := dhtA.Connect(ctx, peerB)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
done <- struct{}{}
|
||||
errs <- err
|
||||
}()
|
||||
go func() {
|
||||
dhtB.peerstore.AddAddress(peerA, addrA)
|
||||
err := dhtB.Connect(ctx, peerA)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
done <- struct{}{}
|
||||
errs <- err
|
||||
}()
|
||||
|
||||
timeout := time.After(time.Second)
|
||||
select {
|
||||
case <-done:
|
||||
case e := <-errs:
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatal("Timeout received!")
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
case e := <-errs:
|
||||
if e != nil {
|
||||
t.Fatal(e)
|
||||
}
|
||||
case <-timeout:
|
||||
t.Fatal("Timeout received!")
|
||||
}
|
||||
@ -555,7 +474,5 @@ func TestConnectCollision(t *testing.T) {
|
||||
dhtB.Close()
|
||||
dhtA.network.Close()
|
||||
dhtB.network.Close()
|
||||
|
||||
<-time.After(200 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
4
diag.go
4
diag.go
@ -32,12 +32,12 @@ func (di *diagInfo) Marshal() []byte {
|
||||
func (dht *IpfsDHT) getDiagInfo() *diagInfo {
|
||||
di := new(diagInfo)
|
||||
di.CodeVersion = "github.com/jbenet/go-ipfs"
|
||||
di.ID = dht.self.ID()
|
||||
di.ID = dht.self
|
||||
di.LifeSpan = time.Since(dht.birth)
|
||||
di.Keys = nil // Currently no way to query datastore
|
||||
|
||||
for _, p := range dht.routingTable.ListPeers() {
|
||||
d := connDiagInfo{p.GetLatency(), p.ID()}
|
||||
d := connDiagInfo{dht.peerstore.LatencyEWMA(p), p}
|
||||
di.Connections = append(di.Connections, d)
|
||||
}
|
||||
return di
|
||||
|
38
ext_test.go
38
ext_test.go
@ -4,19 +4,17 @@ import (
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
crand "crypto/rand"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
mocknet "github.com/jbenet/go-ipfs/net/mock"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
routing "github.com/jbenet/go-ipfs/routing"
|
||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
|
||||
"time"
|
||||
)
|
||||
@ -34,8 +32,8 @@ func TestGetFailures(t *testing.T) {
|
||||
nets := mn.Nets()
|
||||
peers := mn.Peers()
|
||||
|
||||
ps := peer.NewPeerstore()
|
||||
d := NewDHT(ctx, peers[0], ps, nets[0], ds.NewMapDatastore())
|
||||
tsds := dssync.MutexWrap(ds.NewMapDatastore())
|
||||
d := NewDHT(ctx, peers[0], nets[0], tsds)
|
||||
d.Update(ctx, peers[1])
|
||||
|
||||
// This one should time out
|
||||
@ -126,14 +124,6 @@ func TestGetFailures(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Maybe put these in some sort of "ipfs_testutil" package
|
||||
func _randPeer() peer.Peer {
|
||||
id := make(peer.ID, 16)
|
||||
crand.Read(id)
|
||||
p := testutil.NewPeerWithID(id)
|
||||
return p
|
||||
}
|
||||
|
||||
func TestNotFound(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
@ -146,9 +136,8 @@ func TestNotFound(t *testing.T) {
|
||||
}
|
||||
nets := mn.Nets()
|
||||
peers := mn.Peers()
|
||||
peerstore := peer.NewPeerstore()
|
||||
|
||||
d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore())
|
||||
tsds := dssync.MutexWrap(ds.NewMapDatastore())
|
||||
d := NewDHT(ctx, peers[0], nets[0], tsds)
|
||||
|
||||
for _, p := range peers {
|
||||
d.Update(ctx, p)
|
||||
@ -156,6 +145,7 @@ func TestNotFound(t *testing.T) {
|
||||
|
||||
// Reply with random peers to every message
|
||||
for _, neti := range nets {
|
||||
neti := neti // shadow loop var
|
||||
neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
|
||||
defer s.Close()
|
||||
|
||||
@ -171,12 +161,14 @@ func TestNotFound(t *testing.T) {
|
||||
case pb.Message_GET_VALUE:
|
||||
resp := &pb.Message{Type: pmes.Type}
|
||||
|
||||
ps := []peer.Peer{}
|
||||
ps := []peer.PeerInfo{}
|
||||
for i := 0; i < 7; i++ {
|
||||
ps = append(ps, peers[rand.Intn(len(peers))])
|
||||
p := peers[rand.Intn(len(peers))]
|
||||
pi := neti.Peerstore().PeerInfo(p)
|
||||
ps = append(ps, pi)
|
||||
}
|
||||
|
||||
resp.CloserPeers = pb.PeersToPBPeers(d.network, peers)
|
||||
resp.CloserPeers = pb.PeerInfosToPBPeers(d.network, ps)
|
||||
if err := pbw.WriteMsg(resp); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@ -216,9 +208,9 @@ func TestLessThanKResponses(t *testing.T) {
|
||||
}
|
||||
nets := mn.Nets()
|
||||
peers := mn.Peers()
|
||||
peerstore := peer.NewPeerstore()
|
||||
|
||||
d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore())
|
||||
tsds := dssync.MutexWrap(ds.NewMapDatastore())
|
||||
d := NewDHT(ctx, peers[0], nets[0], tsds)
|
||||
|
||||
for i := 1; i < 5; i++ {
|
||||
d.Update(ctx, peers[i])
|
||||
@ -226,6 +218,7 @@ func TestLessThanKResponses(t *testing.T) {
|
||||
|
||||
// Reply with random peers to every message
|
||||
for _, neti := range nets {
|
||||
neti := neti // shadow loop var
|
||||
neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) {
|
||||
defer s.Close()
|
||||
|
||||
@ -239,9 +232,10 @@ func TestLessThanKResponses(t *testing.T) {
|
||||
|
||||
switch pmes.GetType() {
|
||||
case pb.Message_GET_VALUE:
|
||||
pi := neti.Peerstore().PeerInfo(peers[1])
|
||||
resp := &pb.Message{
|
||||
Type: pmes.Type,
|
||||
CloserPeers: pb.PeersToPBPeers(d.network, []peer.Peer{peers[1]}),
|
||||
CloserPeers: pb.PeerInfosToPBPeers(d.network, []peer.PeerInfo{pi}),
|
||||
}
|
||||
|
||||
if err := pbw.WriteMsg(resp); err != nil {
|
||||
|
110
handlers.go
110
handlers.go
@ -17,7 +17,7 @@ import (
|
||||
var CloserPeerCount = 4
|
||||
|
||||
// dhthandler specifies the signature of functions that handle DHT messages.
|
||||
type dhtHandler func(context.Context, peer.Peer, *pb.Message) (*pb.Message, error)
|
||||
type dhtHandler func(context.Context, peer.ID, *pb.Message) (*pb.Message, error)
|
||||
|
||||
func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
|
||||
switch t {
|
||||
@ -38,16 +38,17 @@ func (dht *IpfsDHT) handlerForMsgType(t pb.Message_MessageType) dhtHandler {
|
||||
}
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
log.Debugf("%s handleGetValue for key: %s\n", dht.self, pmes.GetKey())
|
||||
|
||||
// setup response
|
||||
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
||||
|
||||
// first, is the key even a key?
|
||||
// first, is there even a key?
|
||||
key := pmes.GetKey()
|
||||
if key == "" {
|
||||
return nil, errors.New("handleGetValue but no key was provided")
|
||||
// TODO: send back an error response? could be bad, but the other node's hanging.
|
||||
}
|
||||
|
||||
// let's first check if we have the value locally.
|
||||
@ -85,36 +86,38 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.Peer, pmes *pb.Me
|
||||
|
||||
// if we know any providers for the requested value, return those.
|
||||
provs := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey()))
|
||||
provinfos := peer.PeerInfos(dht.peerstore, provs)
|
||||
if len(provs) > 0 {
|
||||
log.Debugf("handleGetValue returning %d provider[s]", len(provs))
|
||||
resp.ProviderPeers = pb.PeersToPBPeers(dht.network, provs)
|
||||
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.network, provinfos)
|
||||
}
|
||||
|
||||
// Find closest peer on given cluster to desired key and reply with that info
|
||||
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
closerinfos := peer.PeerInfos(dht.peerstore, closer)
|
||||
if closer != nil {
|
||||
for _, p := range closer {
|
||||
log.Debugf("handleGetValue returning closer peer: '%s'", p)
|
||||
if len(p.Addresses()) < 1 {
|
||||
log.Critical("no addresses on peer being sent!")
|
||||
for _, pi := range closerinfos {
|
||||
log.Debugf("handleGetValue returning closer peer: '%s'", pi.ID)
|
||||
if len(pi.Addrs) < 1 {
|
||||
log.Criticalf(`no addresses on peer being sent!
|
||||
[local:%s]
|
||||
[sending:%s]
|
||||
[remote:%s]`, dht.self, pi.ID, p)
|
||||
}
|
||||
}
|
||||
resp.CloserPeers = pb.PeersToPBPeers(dht.network, closer)
|
||||
|
||||
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, closerinfos)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Store a value in this peer local storage
|
||||
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||
dht.dslock.Lock()
|
||||
defer dht.dslock.Unlock()
|
||||
func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
dskey := u.Key(pmes.GetKey()).DsKey()
|
||||
|
||||
err := dht.verifyRecord(pmes.GetRecord())
|
||||
if err != nil {
|
||||
fmt.Println(u.Key(pmes.GetRecord().GetAuthor()))
|
||||
log.Error("Bad dht record in put request")
|
||||
if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil {
|
||||
log.Errorf("Bad dht record in PUT from: %s. %s", u.Key(pmes.GetRecord().GetAuthor()), err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -128,18 +131,18 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.Peer, pmes *pb.Me
|
||||
return pmes, err
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) handlePing(_ context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
log.Debugf("%s Responding to ping from %s!\n", dht.self, p)
|
||||
return pmes, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
resp := pb.NewMessage(pmes.GetType(), "", pmes.GetClusterLevel())
|
||||
var closest []peer.Peer
|
||||
var closest []peer.ID
|
||||
|
||||
// if looking for self... special case where we send it on CloserPeers.
|
||||
if peer.ID(pmes.GetKey()).Equal(dht.self.ID()) {
|
||||
closest = []peer.Peer{dht.self}
|
||||
if peer.ID(pmes.GetKey()) == dht.self {
|
||||
closest = []peer.ID{dht.self}
|
||||
} else {
|
||||
closest = dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
}
|
||||
@ -149,22 +152,20 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.Peer, pmes *pb.Me
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
var withAddresses []peer.Peer
|
||||
for _, p := range closest {
|
||||
if len(p.Addresses()) > 0 {
|
||||
withAddresses = append(withAddresses, p)
|
||||
var withAddresses []peer.PeerInfo
|
||||
closestinfos := peer.PeerInfos(dht.peerstore, closest)
|
||||
for _, pi := range closestinfos {
|
||||
if len(pi.Addrs) > 0 {
|
||||
withAddresses = append(withAddresses, pi)
|
||||
log.Debugf("handleFindPeer: sending back '%s'", pi.ID)
|
||||
}
|
||||
}
|
||||
|
||||
for _, p := range withAddresses {
|
||||
log.Debugf("handleFindPeer: sending back '%s'", p)
|
||||
}
|
||||
|
||||
resp.CloserPeers = pb.PeersToPBPeers(dht.network, withAddresses)
|
||||
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, withAddresses)
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
||||
|
||||
// check if we have this value, to add ourselves as provider.
|
||||
@ -183,13 +184,15 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *p
|
||||
}
|
||||
|
||||
if providers != nil && len(providers) > 0 {
|
||||
resp.ProviderPeers = pb.PeersToPBPeers(dht.network, providers)
|
||||
infos := peer.PeerInfos(dht.peerstore, providers)
|
||||
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.network, infos)
|
||||
}
|
||||
|
||||
// Also send closer peers.
|
||||
closer := dht.betterPeersToQuery(pmes, CloserPeerCount)
|
||||
if closer != nil {
|
||||
resp.CloserPeers = pb.PeersToPBPeers(dht.network, closer)
|
||||
infos := peer.PeerInfos(dht.peerstore, providers)
|
||||
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
@ -197,34 +200,35 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.Peer, pmes *p
|
||||
|
||||
type providerInfo struct {
|
||||
Creation time.Time
|
||||
Value peer.Peer
|
||||
Value peer.ID
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) {
|
||||
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
key := u.Key(pmes.GetKey())
|
||||
|
||||
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
|
||||
|
||||
// add provider should use the address given in the message
|
||||
for _, pb := range pmes.GetProviderPeers() {
|
||||
pid := peer.ID(pb.GetId())
|
||||
if pid.Equal(p.ID()) {
|
||||
|
||||
maddrs, err := pb.Addresses()
|
||||
if err != nil {
|
||||
log.Errorf("provider %s error with addresses %s", p, pb.Addrs)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Infof("received provider %s %s for %s", p, maddrs, key)
|
||||
for _, maddr := range maddrs {
|
||||
p.AddAddress(maddr)
|
||||
}
|
||||
dht.providers.AddProvider(key, p)
|
||||
|
||||
} else {
|
||||
log.Errorf("handleAddProvider received provider %s from %s", pid, p)
|
||||
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
|
||||
for _, pi := range pinfos {
|
||||
if pi.ID != p {
|
||||
// we should ignore this provider reccord! not from originator.
|
||||
// (we chould sign them and check signature later...)
|
||||
log.Errorf("handleAddProvider received provider %s from %s. Ignore.", pi.ID, p)
|
||||
continue
|
||||
}
|
||||
|
||||
if len(pi.Addrs) < 1 {
|
||||
log.Errorf("got no valid addresses for provider %s. Ignore.", p)
|
||||
continue
|
||||
}
|
||||
|
||||
log.Infof("received provider %s for %s (addrs: %s)", p, key, pi.Addrs)
|
||||
for _, maddr := range pi.Addrs {
|
||||
// add the received addresses to our peerstore.
|
||||
dht.peerstore.AddAddress(p, maddr)
|
||||
}
|
||||
dht.providers.AddProvider(key, p)
|
||||
}
|
||||
|
||||
return pmes, nil // send back same msg as confirmation.
|
||||
|
@ -182,7 +182,7 @@ type Message_Peer struct {
|
||||
// ID of a given peer.
|
||||
Id *string `protobuf:"bytes,1,opt,name=id" json:"id,omitempty"`
|
||||
// multiaddrs for a given peer
|
||||
Addrs []string `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"`
|
||||
Addrs [][]byte `protobuf:"bytes,2,rep,name=addrs" json:"addrs,omitempty"`
|
||||
// used to signal the sender's connection capabilities to the peer
|
||||
Connection *Message_ConnectionType `protobuf:"varint,3,opt,name=connection,enum=dht.pb.Message_ConnectionType" json:"connection,omitempty"`
|
||||
XXX_unrecognized []byte `json:"-"`
|
||||
@ -199,7 +199,7 @@ func (m *Message_Peer) GetId() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (m *Message_Peer) GetAddrs() []string {
|
||||
func (m *Message_Peer) GetAddrs() [][]byte {
|
||||
if m != nil {
|
||||
return m.Addrs
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ message Message {
|
||||
optional string id = 1;
|
||||
|
||||
// multiaddrs for a given peer
|
||||
repeated string addrs = 2;
|
||||
repeated bytes addrs = 2;
|
||||
|
||||
// used to signal the sender's connection capabilities to the peer
|
||||
optional ConnectionType connection = 3;
|
||||
|
@ -1,15 +1,15 @@
|
||||
package dht_pb
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("dht.pb")
|
||||
|
||||
// NewMessage constructs a new dht message with given type, key, and level
|
||||
func NewMessage(typ Message_MessageType, key string, level int) *Message {
|
||||
m := &Message{
|
||||
@ -20,43 +20,32 @@ func NewMessage(typ Message_MessageType, key string, level int) *Message {
|
||||
return m
|
||||
}
|
||||
|
||||
func peerToPBPeer(p peer.Peer) *Message_Peer {
|
||||
func peerInfoToPBPeer(p peer.PeerInfo) *Message_Peer {
|
||||
pbp := new(Message_Peer)
|
||||
|
||||
maddrs := p.Addresses()
|
||||
pbp.Addrs = make([]string, len(maddrs))
|
||||
for i, maddr := range maddrs {
|
||||
pbp.Addrs[i] = maddr.String()
|
||||
pbp.Addrs = make([][]byte, len(p.Addrs))
|
||||
for i, maddr := range p.Addrs {
|
||||
pbp.Addrs[i] = maddr.Bytes() // Bytes, not String. Compressed.
|
||||
}
|
||||
pid := string(p.ID())
|
||||
pbp.Id = &pid
|
||||
s := string(p.ID)
|
||||
pbp.Id = &s
|
||||
return pbp
|
||||
}
|
||||
|
||||
// PBPeerToPeer turns a *Message_Peer into its peer.Peer counterpart
|
||||
func PBPeerToPeer(ps peer.Peerstore, pbp *Message_Peer) (peer.Peer, error) {
|
||||
p, err := ps.FindOrCreate(peer.ID(pbp.GetId()))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to get peer from peerstore: %s", err)
|
||||
// PBPeerToPeer turns a *Message_Peer into its peer.PeerInfo counterpart
|
||||
func PBPeerToPeerInfo(pbp *Message_Peer) peer.PeerInfo {
|
||||
return peer.PeerInfo{
|
||||
ID: peer.ID(pbp.GetId()),
|
||||
Addrs: pbp.Addresses(),
|
||||
}
|
||||
|
||||
// add addresses
|
||||
maddrs, err := pbp.Addresses()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Received peer with bad or missing addresses: %s", pbp.Addrs)
|
||||
}
|
||||
for _, maddr := range maddrs {
|
||||
p.AddAddress(maddr)
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// RawPeersToPBPeers converts a slice of Peers into a slice of *Message_Peers,
|
||||
// RawPeerInfosToPBPeers converts a slice of Peers into a slice of *Message_Peers,
|
||||
// ready to go out on the wire.
|
||||
func RawPeersToPBPeers(peers []peer.Peer) []*Message_Peer {
|
||||
func RawPeerInfosToPBPeers(peers []peer.PeerInfo) []*Message_Peer {
|
||||
pbpeers := make([]*Message_Peer, len(peers))
|
||||
for i, p := range peers {
|
||||
pbpeers[i] = peerToPBPeer(p)
|
||||
pbpeers[i] = peerInfoToPBPeer(p)
|
||||
}
|
||||
return pbpeers
|
||||
}
|
||||
@ -64,49 +53,42 @@ func RawPeersToPBPeers(peers []peer.Peer) []*Message_Peer {
|
||||
// PeersToPBPeers converts given []peer.Peer into a set of []*Message_Peer,
|
||||
// which can be written to a message and sent out. the key thing this function
|
||||
// does (in addition to PeersToPBPeers) is set the ConnectionType with
|
||||
// information from the given inet.Dialer.
|
||||
func PeersToPBPeers(d inet.Network, peers []peer.Peer) []*Message_Peer {
|
||||
pbps := RawPeersToPBPeers(peers)
|
||||
// information from the given inet.Network.
|
||||
func PeerInfosToPBPeers(n inet.Network, peers []peer.PeerInfo) []*Message_Peer {
|
||||
pbps := RawPeerInfosToPBPeers(peers)
|
||||
for i, pbp := range pbps {
|
||||
c := ConnectionType(d.Connectedness(peers[i]))
|
||||
c := ConnectionType(n.Connectedness(peers[i].ID))
|
||||
pbp.Connection = &c
|
||||
}
|
||||
return pbps
|
||||
}
|
||||
|
||||
// PBPeersToPeers converts given []*Message_Peer into a set of []peer.Peer
|
||||
// Returns two slices, one of peers, and one of errors. The slice of peers
|
||||
// will ONLY contain successfully converted peers. The slice of errors contains
|
||||
// whether each input Message_Peer was successfully converted.
|
||||
func PBPeersToPeers(ps peer.Peerstore, pbps []*Message_Peer) ([]peer.Peer, []error) {
|
||||
errs := make([]error, len(pbps))
|
||||
peers := make([]peer.Peer, 0, len(pbps))
|
||||
for i, pbp := range pbps {
|
||||
p, err := PBPeerToPeer(ps, pbp)
|
||||
if err != nil {
|
||||
errs[i] = err
|
||||
} else {
|
||||
peers = append(peers, p)
|
||||
}
|
||||
// PBPeersToPeerInfos converts given []*Message_Peer into []peer.PeerInfo
|
||||
// Invalid addresses will be silently omitted.
|
||||
func PBPeersToPeerInfos(pbps []*Message_Peer) []peer.PeerInfo {
|
||||
peers := make([]peer.PeerInfo, 0, len(pbps))
|
||||
for _, pbp := range pbps {
|
||||
peers = append(peers, PBPeerToPeerInfo(pbp))
|
||||
}
|
||||
return peers, errs
|
||||
return peers
|
||||
}
|
||||
|
||||
// Addresses returns a multiaddr associated with the Message_Peer entry
|
||||
func (m *Message_Peer) Addresses() ([]ma.Multiaddr, error) {
|
||||
func (m *Message_Peer) Addresses() []ma.Multiaddr {
|
||||
if m == nil {
|
||||
return nil, errors.New("MessagePeer is nil")
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
maddrs := make([]ma.Multiaddr, len(m.Addrs))
|
||||
for i, addr := range m.Addrs {
|
||||
maddrs[i], err = ma.NewMultiaddr(addr)
|
||||
maddrs[i], err = ma.NewMultiaddrBytes(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
log.Error("error decoding Multiaddr for peer: %s", m.GetId())
|
||||
continue
|
||||
}
|
||||
}
|
||||
return maddrs, nil
|
||||
return maddrs
|
||||
}
|
||||
|
||||
// GetClusterLevel gets and adjusts the cluster level on the message.
|
||||
|
14
providers.go
14
providers.go
@ -23,12 +23,12 @@ type ProviderManager struct {
|
||||
|
||||
type addProv struct {
|
||||
k u.Key
|
||||
val peer.Peer
|
||||
val peer.ID
|
||||
}
|
||||
|
||||
type getProv struct {
|
||||
k u.Key
|
||||
resp chan []peer.Peer
|
||||
resp chan []peer.ID
|
||||
}
|
||||
|
||||
func NewProviderManager(ctx context.Context, local peer.ID) *ProviderManager {
|
||||
@ -53,7 +53,7 @@ func (pm *ProviderManager) run() {
|
||||
for {
|
||||
select {
|
||||
case np := <-pm.newprovs:
|
||||
if np.val.ID().Equal(pm.lpeer) {
|
||||
if np.val == pm.lpeer {
|
||||
pm.local[np.k] = struct{}{}
|
||||
}
|
||||
pi := new(providerInfo)
|
||||
@ -63,7 +63,7 @@ func (pm *ProviderManager) run() {
|
||||
pm.providers[np.k] = append(arr, pi)
|
||||
|
||||
case gp := <-pm.getprovs:
|
||||
var parr []peer.Peer
|
||||
var parr []peer.ID
|
||||
provs := pm.providers[gp.k]
|
||||
for _, p := range provs {
|
||||
parr = append(parr, p.Value)
|
||||
@ -94,17 +94,17 @@ func (pm *ProviderManager) run() {
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) AddProvider(k u.Key, val peer.Peer) {
|
||||
func (pm *ProviderManager) AddProvider(k u.Key, val peer.ID) {
|
||||
pm.newprovs <- &addProv{
|
||||
k: k,
|
||||
val: val,
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *ProviderManager) GetProviders(ctx context.Context, k u.Key) []peer.Peer {
|
||||
func (pm *ProviderManager) GetProviders(ctx context.Context, k u.Key) []peer.ID {
|
||||
gp := &getProv{
|
||||
k: k,
|
||||
resp: make(chan []peer.Peer, 1), // buffered to prevent sender from blocking
|
||||
resp: make(chan []peer.ID, 1), // buffered to prevent sender from blocking
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
|
@ -3,9 +3,8 @@ package dht
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/jbenet/go-ipfs/peer"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
testutil "github.com/jbenet/go-ipfs/util/testutil"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
)
|
||||
@ -15,7 +14,7 @@ func TestProviderManager(t *testing.T) {
|
||||
mid := peer.ID("testing")
|
||||
p := NewProviderManager(ctx, mid)
|
||||
a := u.Key("test")
|
||||
p.AddProvider(a, testutil.NewPeerWithIDString("testingprovider"))
|
||||
p.AddProvider(a, peer.ID("testingprovider"))
|
||||
resp := p.GetProviders(ctx, a)
|
||||
if len(resp) != 1 {
|
||||
t.Fatal("Could not retrieve provider.")
|
||||
|
48
query.go
48
query.go
@ -31,10 +31,10 @@ type dhtQuery struct {
|
||||
}
|
||||
|
||||
type dhtQueryResult struct {
|
||||
value []byte // GetValue
|
||||
peer peer.Peer // FindPeer
|
||||
providerPeers []peer.Peer // GetProviders
|
||||
closerPeers []peer.Peer // *
|
||||
value []byte // GetValue
|
||||
peer peer.PeerInfo // FindPeer
|
||||
providerPeers []peer.PeerInfo // GetProviders
|
||||
closerPeers []peer.PeerInfo // *
|
||||
success bool
|
||||
}
|
||||
|
||||
@ -53,10 +53,10 @@ func newQuery(k u.Key, d inet.Dialer, f queryFunc) *dhtQuery {
|
||||
// - the value
|
||||
// - a list of peers potentially better able to serve the query
|
||||
// - an error
|
||||
type queryFunc func(context.Context, peer.Peer) (*dhtQueryResult, error)
|
||||
type queryFunc func(context.Context, peer.ID) (*dhtQueryResult, error)
|
||||
|
||||
// Run runs the query at hand. pass in a list of peers to use first.
|
||||
func (q *dhtQuery) Run(ctx context.Context, peers []peer.Peer) (*dhtQueryResult, error) {
|
||||
func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, error) {
|
||||
runner := newQueryRunner(ctx, q)
|
||||
return runner.Run(peers)
|
||||
}
|
||||
@ -70,7 +70,7 @@ type dhtQueryRunner struct {
|
||||
peersToQuery *queue.ChanQueue
|
||||
|
||||
// peersSeen are all the peers queried. used to prevent querying same peer 2x
|
||||
peersSeen peer.Map
|
||||
peersSeen peer.Set
|
||||
|
||||
// rateLimit is a channel used to rate limit our processing (semaphore)
|
||||
rateLimit chan struct{}
|
||||
@ -101,12 +101,12 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
|
||||
query: q,
|
||||
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
|
||||
peersRemaining: todoctr.NewSyncCounter(),
|
||||
peersSeen: peer.Map{},
|
||||
peersSeen: peer.Set{},
|
||||
rateLimit: make(chan struct{}, q.concurrency),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) {
|
||||
func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
||||
log.Debugf("Run query with %d peers.", len(peers))
|
||||
if len(peers) == 0 {
|
||||
log.Warning("Running query with no peers!")
|
||||
@ -120,7 +120,7 @@ func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) {
|
||||
|
||||
// add all the peers we got first.
|
||||
for _, p := range peers {
|
||||
r.addPeerToQuery(p, nil) // don't have access to self here...
|
||||
r.addPeerToQuery(p, "") // don't have access to self here...
|
||||
}
|
||||
|
||||
// go do this thing.
|
||||
@ -154,31 +154,30 @@ func (r *dhtQueryRunner) Run(peers []peer.Peer) (*dhtQueryResult, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (r *dhtQueryRunner) addPeerToQuery(next peer.Peer, benchmark peer.Peer) {
|
||||
if next == nil {
|
||||
// wtf why are peers nil?!?
|
||||
log.Error("Query getting nil peers!!!\n")
|
||||
return
|
||||
}
|
||||
|
||||
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID, benchmark peer.ID) {
|
||||
// if new peer is ourselves...
|
||||
if next.ID().Equal(r.query.dialer.LocalPeer().ID()) {
|
||||
if next == r.query.dialer.LocalPeer() {
|
||||
return
|
||||
}
|
||||
|
||||
// if new peer further away than whom we got it from, don't bother (loops)
|
||||
if benchmark != nil && kb.Closer(benchmark.ID(), next.ID(), r.query.key) {
|
||||
// TODO----------- this benchmark should be replaced by a heap:
|
||||
// we should be doing the s/kademlia "continue to search"
|
||||
// (i.e. put all of them in a heap sorted by dht distance and then just
|
||||
// pull from the the top until a) you exhaust all peers you get,
|
||||
// b) you succeed, c) your context expires.
|
||||
if benchmark != "" && kb.Closer(benchmark, next, r.query.key) {
|
||||
return
|
||||
}
|
||||
|
||||
// if already seen, no need.
|
||||
r.Lock()
|
||||
_, found := r.peersSeen[next.Key()]
|
||||
_, found := r.peersSeen[next]
|
||||
if found {
|
||||
r.Unlock()
|
||||
return
|
||||
}
|
||||
r.peersSeen[next.Key()] = next
|
||||
r.peersSeen[next] = struct{}{}
|
||||
r.Unlock()
|
||||
|
||||
log.Debugf("adding peer to query: %v\n", next)
|
||||
@ -211,7 +210,7 @@ func (r *dhtQueryRunner) spawnWorkers() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
|
||||
func (r *dhtQueryRunner) queryPeer(p peer.ID) {
|
||||
log.Debugf("spawned worker for: %v", p)
|
||||
|
||||
// make sure we rate limit concurrency.
|
||||
@ -234,7 +233,6 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
|
||||
}()
|
||||
|
||||
// make sure we're connected to the peer.
|
||||
// (Incidentally, this will add it to the peerstore too)
|
||||
err := r.query.dialer.DialPeer(r.ctx, p)
|
||||
if err != nil {
|
||||
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
|
||||
@ -263,7 +261,9 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
|
||||
} else if res.closerPeers != nil {
|
||||
log.Debugf("PEERS CLOSER -- worker for: %v", p)
|
||||
for _, next := range res.closerPeers {
|
||||
r.addPeerToQuery(next, p)
|
||||
// add their addresses to the dialer's peerstore
|
||||
r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs)
|
||||
r.addPeerToQuery(next.ID, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
145
records.go
145
records.go
@ -3,15 +3,17 @@ package dht
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||
|
||||
ci "github.com/jbenet/go-ipfs/crypto"
|
||||
"github.com/jbenet/go-ipfs/peer"
|
||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
ctxutil "github.com/jbenet/go-ipfs/util/ctx"
|
||||
)
|
||||
|
||||
// ValidatorFunc is a function that is called to validate a given
|
||||
@ -26,64 +28,163 @@ var ErrBadRecord = errors.New("bad dht record")
|
||||
// is not found in the Validator map of the DHT.
|
||||
var ErrInvalidRecordType = errors.New("invalid record keytype")
|
||||
|
||||
// KeyForPublicKey returns the key used to retrieve public keys
|
||||
// from the dht.
|
||||
func KeyForPublicKey(id peer.ID) u.Key {
|
||||
return u.Key("/pk/" + string(id))
|
||||
}
|
||||
|
||||
// RecordBlobForSig returns the blob protected by the record signature
|
||||
func RecordBlobForSig(r *pb.Record) []byte {
|
||||
k := []byte(r.GetKey())
|
||||
v := []byte(r.GetValue())
|
||||
a := []byte(r.GetAuthor())
|
||||
return bytes.Join([][]byte{k, v, a}, []byte{})
|
||||
}
|
||||
|
||||
// creates and signs a dht record for the given key/value pair
|
||||
func (dht *IpfsDHT) makePutRecord(key u.Key, value []byte) (*pb.Record, error) {
|
||||
record := new(pb.Record)
|
||||
|
||||
record.Key = proto.String(string(key))
|
||||
record.Value = value
|
||||
record.Author = proto.String(string(dht.self.ID()))
|
||||
blob := bytes.Join([][]byte{[]byte(key), value, []byte(dht.self.ID())}, []byte{})
|
||||
sig, err := dht.self.PrivKey().Sign(blob)
|
||||
record.Author = proto.String(string(dht.self))
|
||||
blob := RecordBlobForSig(record)
|
||||
|
||||
sk := dht.peerstore.PrivKey(dht.self)
|
||||
if sk == nil {
|
||||
log.Errorf("%s dht cannot get own private key!", dht.self)
|
||||
return nil, fmt.Errorf("cannot get private key to sign record!")
|
||||
}
|
||||
|
||||
sig, err := sk.Sign(blob)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
record.Signature = sig
|
||||
return record, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) getPublicKey(pid peer.ID) (ci.PubKey, error) {
|
||||
log.Debug("getPublicKey for: %s", pid)
|
||||
p, err := dht.peerstore.FindOrCreate(pid)
|
||||
if err == nil {
|
||||
return p.PubKey(), nil
|
||||
func (dht *IpfsDHT) getPublicKeyOnline(ctx context.Context, p peer.ID) (ci.PubKey, error) {
|
||||
log.Debugf("getPublicKey for: %s", p)
|
||||
|
||||
// check locally.
|
||||
pk := dht.peerstore.PubKey(p)
|
||||
if pk != nil {
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
log.Debug("not in peerstore, searching dht.")
|
||||
ctxT, _ := context.WithTimeout(dht.ContextGroup.Context(), time.Second*5)
|
||||
val, err := dht.GetValue(ctxT, u.Key("/pk/"+string(pid)))
|
||||
// ok, try the node itself. if they're overwhelmed or slow we can move on.
|
||||
ctxT, _ := ctxutil.WithDeadlineFraction(ctx, 0.3)
|
||||
if pk, err := dht.getPublicKeyFromNode(ctx, p); err == nil {
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
// last ditch effort: let's try the dht.
|
||||
log.Debugf("pk for %s not in peerstore, and peer failed. trying dht.", p)
|
||||
pkkey := KeyForPublicKey(p)
|
||||
|
||||
// ok, try the node itself. if they're overwhelmed or slow we can move on.
|
||||
val, err := dht.GetValue(ctxT, pkkey)
|
||||
if err != nil {
|
||||
log.Warning("Failed to find requested public key.")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pubkey, err := ci.UnmarshalPublicKey(val)
|
||||
pk, err = ci.UnmarshalPublicKey(val)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to unmarshal public key: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
return pubkey, nil
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) verifyRecord(r *pb.Record) error {
|
||||
func (dht *IpfsDHT) getPublicKeyFromNode(ctx context.Context, p peer.ID) (ci.PubKey, error) {
|
||||
|
||||
// check locally, just in case...
|
||||
pk := dht.peerstore.PubKey(p)
|
||||
if pk != nil {
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
pkkey := KeyForPublicKey(p)
|
||||
pmes, err := dht.getValueSingle(ctx, p, pkkey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// node doesn't have key :(
|
||||
record := pmes.GetRecord()
|
||||
if record == nil {
|
||||
return nil, fmt.Errorf("node not responding with its public key: %s", p)
|
||||
}
|
||||
|
||||
// Success! We were given the value. we don't need to check
|
||||
// validity because a) we can't. b) we know the hash of the
|
||||
// key we're looking for.
|
||||
val := record.GetValue()
|
||||
log.Debug("dht got a value from other peer.")
|
||||
|
||||
pk, err = ci.UnmarshalPublicKey(val)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
id, err := peer.IDFromPublicKey(pk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if id != p {
|
||||
return nil, fmt.Errorf("public key does not match id: %s", p)
|
||||
}
|
||||
|
||||
// ok! it's valid. we got it!
|
||||
log.Debugf("dht got public key from node itself.")
|
||||
return pk, nil
|
||||
}
|
||||
|
||||
// verifyRecordLocally attempts to verify a record. if we do not have the public
|
||||
// key, we fail. we do not search the dht.
|
||||
func (dht *IpfsDHT) verifyRecordLocally(r *pb.Record) error {
|
||||
|
||||
// First, validate the signature
|
||||
p, err := dht.peerstore.FindOrCreate(peer.ID(r.GetAuthor()))
|
||||
p := peer.ID(r.GetAuthor())
|
||||
pk := dht.peerstore.PubKey(p)
|
||||
if pk == nil {
|
||||
return fmt.Errorf("do not have public key for %s", p)
|
||||
}
|
||||
|
||||
return dht.verifyRecord(r, pk)
|
||||
}
|
||||
|
||||
// verifyRecordOnline verifies a record, searching the DHT for the public key
|
||||
// if necessary. The reason there is a distinction in the functions is that
|
||||
// retrieving arbitrary public keys from the DHT as a result of passively
|
||||
// receiving records (e.g. through a PUT_VALUE or ADD_PROVIDER) can cause a
|
||||
// massive amplification attack on the dht. Use with care.
|
||||
func (dht *IpfsDHT) verifyRecordOnline(ctx context.Context, r *pb.Record) error {
|
||||
|
||||
// get the public key, search for it if necessary.
|
||||
p := peer.ID(r.GetAuthor())
|
||||
pk, err := dht.getPublicKeyOnline(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k := u.Key(r.GetKey())
|
||||
|
||||
blob := bytes.Join([][]byte{[]byte(k),
|
||||
r.GetValue(),
|
||||
[]byte(r.GetAuthor())}, []byte{})
|
||||
return dht.verifyRecord(r, pk)
|
||||
}
|
||||
|
||||
ok, err := p.PubKey().Verify(blob, r.GetSignature())
|
||||
func (dht *IpfsDHT) verifyRecord(r *pb.Record, pk ci.PubKey) error {
|
||||
// First, validate the signature
|
||||
blob := RecordBlobForSig(r)
|
||||
ok, err := pk.Verify(blob, r.GetSignature())
|
||||
if err != nil {
|
||||
log.Error("Signature verify failed.")
|
||||
return err
|
||||
}
|
||||
|
||||
if !ok {
|
||||
log.Error("dht found a forged record! (ignored)")
|
||||
return ErrBadRecord
|
||||
}
|
||||
|
||||
|
120
routing.go
120
routing.go
@ -41,7 +41,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
|
||||
|
||||
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), KValue)
|
||||
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
log.Debugf("%s PutValue qry part %v", dht.self, p)
|
||||
err := dht.putValueToNetwork(ctx, p, string(key), rec)
|
||||
if err != nil {
|
||||
@ -61,7 +61,6 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||
log.Debugf("Get Value [%s]", key)
|
||||
|
||||
// If we have it local, dont bother doing an RPC!
|
||||
// NOTE: this might not be what we want to do...
|
||||
val, err := dht.getLocal(key)
|
||||
if err == nil {
|
||||
log.Debug("Got value locally!")
|
||||
@ -76,7 +75,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
|
||||
}
|
||||
|
||||
// setup the Query
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
|
||||
val, peers, err := dht.getValueOrPeers(ctx, p, key)
|
||||
if err != nil {
|
||||
@ -131,14 +130,14 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
|
||||
// FindProvidersAsync is the same thing as FindProviders, but returns a channel.
|
||||
// Peers will be returned on the channel as soon as they are found, even before
|
||||
// the search query completes.
|
||||
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.Peer {
|
||||
func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int) <-chan peer.PeerInfo {
|
||||
log.Event(ctx, "findProviders", &key)
|
||||
peerOut := make(chan peer.Peer, count)
|
||||
peerOut := make(chan peer.PeerInfo, count)
|
||||
go dht.findProvidersAsyncRoutine(ctx, key, count, peerOut)
|
||||
return peerOut
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.Peer) {
|
||||
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) {
|
||||
defer close(peerOut)
|
||||
|
||||
ps := pset.NewLimited(count)
|
||||
@ -147,7 +146,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
||||
// NOTE: assuming that this list of peers is unique
|
||||
if ps.TryAdd(p) {
|
||||
select {
|
||||
case peerOut <- p:
|
||||
case peerOut <- dht.peerstore.PeerInfo(p):
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
@ -160,23 +159,18 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
||||
}
|
||||
|
||||
// setup the Query
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
|
||||
pmes, err := dht.findProvidersSingle(ctx, p, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
provs, errs := pb.PBPeersToPeers(dht.peerstore, pmes.GetProviderPeers())
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
log.Warning(err)
|
||||
}
|
||||
}
|
||||
provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
|
||||
|
||||
// Add unique providers from request, up to 'count'
|
||||
for _, prov := range provs {
|
||||
if ps.TryAdd(prov) {
|
||||
if ps.TryAdd(prov.ID) {
|
||||
select {
|
||||
case peerOut <- prov:
|
||||
case <-ctx.Done():
|
||||
@ -191,13 +185,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
||||
|
||||
// Give closer peers back to the query to be queried
|
||||
closer := pmes.GetCloserPeers()
|
||||
clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer)
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
log.Warning(err)
|
||||
}
|
||||
}
|
||||
|
||||
clpeers := pb.PBPeersToPeerInfos(closer)
|
||||
return &dhtQueryResult{closerPeers: clpeers}, nil
|
||||
})
|
||||
|
||||
@ -208,62 +196,58 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
|
||||
}
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.Peer) {
|
||||
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *pset.PeerSet, count int, out chan peer.PeerInfo) {
|
||||
var wg sync.WaitGroup
|
||||
for _, pbp := range peers {
|
||||
peerInfos := pb.PBPeersToPeerInfos(peers)
|
||||
for _, pi := range peerInfos {
|
||||
wg.Add(1)
|
||||
go func(mp *pb.Message_Peer) {
|
||||
go func(pi peer.PeerInfo) {
|
||||
defer wg.Done()
|
||||
// construct new peer
|
||||
p, err := dht.ensureConnectedToPeer(ctx, mp)
|
||||
if err != nil {
|
||||
|
||||
p := pi.ID
|
||||
if err := dht.ensureConnectedToPeer(ctx, p); err != nil {
|
||||
log.Errorf("%s", err)
|
||||
return
|
||||
}
|
||||
if p == nil {
|
||||
log.Error("Got nil peer from ensureConnectedToPeer")
|
||||
return
|
||||
}
|
||||
|
||||
dht.providers.AddProvider(k, p)
|
||||
if ps.TryAdd(p) {
|
||||
select {
|
||||
case out <- p:
|
||||
case out <- pi:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
} else if ps.Size() >= count {
|
||||
return
|
||||
}
|
||||
}(pbp)
|
||||
}(pi)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// FindPeer searches for a peer with given ID.
|
||||
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error) {
|
||||
func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, error) {
|
||||
|
||||
// Check if were already connected to them
|
||||
p, _ := dht.FindLocal(id)
|
||||
if p != nil {
|
||||
return p, nil
|
||||
if pi, _ := dht.FindLocal(id); pi.ID != "" {
|
||||
return pi, nil
|
||||
}
|
||||
|
||||
closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
|
||||
if closest == nil || len(closest) == 0 {
|
||||
return nil, kb.ErrLookupFailure
|
||||
return peer.PeerInfo{}, kb.ErrLookupFailure
|
||||
}
|
||||
|
||||
// Sanity...
|
||||
for _, p := range closest {
|
||||
if p.ID().Equal(id) {
|
||||
if p == id {
|
||||
log.Error("Found target peer in list of closest peers...")
|
||||
return p, nil
|
||||
return dht.peerstore.PeerInfo(p), nil
|
||||
}
|
||||
}
|
||||
|
||||
// setup the Query
|
||||
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
|
||||
pmes, err := dht.findPeerSingle(ctx, p, id)
|
||||
if err != nil {
|
||||
@ -271,45 +255,40 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.Peer, error)
|
||||
}
|
||||
|
||||
closer := pmes.GetCloserPeers()
|
||||
clpeers, errs := pb.PBPeersToPeers(dht.peerstore, closer)
|
||||
for _, err := range errs {
|
||||
if err != nil {
|
||||
log.Warning(err)
|
||||
}
|
||||
}
|
||||
clpeerInfos := pb.PBPeersToPeerInfos(closer)
|
||||
|
||||
// see it we got the peer here
|
||||
for _, np := range clpeers {
|
||||
if string(np.ID()) == string(id) {
|
||||
for _, npi := range clpeerInfos {
|
||||
if npi.ID == id {
|
||||
return &dhtQueryResult{
|
||||
peer: np,
|
||||
peer: npi,
|
||||
success: true,
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
|
||||
return &dhtQueryResult{closerPeers: clpeers}, nil
|
||||
return &dhtQueryResult{closerPeers: clpeerInfos}, nil
|
||||
})
|
||||
|
||||
// run it!
|
||||
result, err := query.Run(ctx, closest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return peer.PeerInfo{}, err
|
||||
}
|
||||
|
||||
log.Debugf("FindPeer %v %v", id, result.success)
|
||||
if result.peer == nil {
|
||||
return nil, routing.ErrNotFound
|
||||
if result.peer.ID == "" {
|
||||
return peer.PeerInfo{}, routing.ErrNotFound
|
||||
}
|
||||
|
||||
return result.peer, nil
|
||||
}
|
||||
|
||||
// FindPeersConnectedToPeer searches for peers directly connected to a given peer.
|
||||
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.Peer, error) {
|
||||
func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<-chan peer.PeerInfo, error) {
|
||||
|
||||
peerchan := make(chan peer.Peer, asyncQueryBuffer)
|
||||
peersSeen := map[string]peer.Peer{}
|
||||
peerchan := make(chan peer.PeerInfo, asyncQueryBuffer)
|
||||
peersSeen := peer.Set{}
|
||||
|
||||
closest := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
|
||||
if closest == nil || len(closest) == 0 {
|
||||
@ -317,42 +296,37 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
|
||||
}
|
||||
|
||||
// setup the Query
|
||||
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.Peer) (*dhtQueryResult, error) {
|
||||
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
|
||||
|
||||
pmes, err := dht.findPeerSingle(ctx, p, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var clpeers []peer.Peer
|
||||
var clpeers []peer.PeerInfo
|
||||
closer := pmes.GetCloserPeers()
|
||||
for _, pbp := range closer {
|
||||
// skip peers already seen
|
||||
if _, found := peersSeen[string(pbp.GetId())]; found {
|
||||
continue
|
||||
}
|
||||
pi := pb.PBPeerToPeerInfo(pbp)
|
||||
|
||||
// skip peers that fail to unmarshal
|
||||
p, err := pb.PBPeerToPeer(dht.peerstore, pbp)
|
||||
if err != nil {
|
||||
log.Warning(err)
|
||||
// skip peers already seen
|
||||
if _, found := peersSeen[pi.ID]; found {
|
||||
continue
|
||||
}
|
||||
peersSeen[pi.ID] = struct{}{}
|
||||
|
||||
// if peer is connected, send it to our client.
|
||||
if pb.Connectedness(*pbp.Connection) == inet.Connected {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case peerchan <- p:
|
||||
case peerchan <- pi:
|
||||
}
|
||||
}
|
||||
|
||||
peersSeen[string(p.ID())] = p
|
||||
|
||||
// if peer is the peer we're looking for, don't bother querying it.
|
||||
// TODO maybe query it?
|
||||
if pb.Connectedness(*pbp.Connection) != inet.Connected {
|
||||
clpeers = append(clpeers, p)
|
||||
clpeers = append(clpeers, pi)
|
||||
}
|
||||
}
|
||||
|
||||
@ -374,7 +348,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
|
||||
}
|
||||
|
||||
// Ping a peer, log the time it took
|
||||
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.Peer) error {
|
||||
func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) error {
|
||||
// Thoughts: maybe this should accept an ID and do a peer lookup?
|
||||
log.Debugf("ping %s start", p)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user