swap net2 -> net

This commit is contained in:
Juan Batiz-Benet 2015-01-01 12:45:39 -08:00
parent 6bc26e01f3
commit 719cd7c51c
7 changed files with 93 additions and 118 deletions

28
dht.go
View File

@ -10,8 +10,9 @@ import (
"sync" "sync"
"time" "time"
inet "github.com/jbenet/go-ipfs/p2p/net" host "github.com/jbenet/go-ipfs/p2p/host"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
protocol "github.com/jbenet/go-ipfs/p2p/protocol"
routing "github.com/jbenet/go-ipfs/routing" routing "github.com/jbenet/go-ipfs/routing"
pb "github.com/jbenet/go-ipfs/routing/dht/pb" pb "github.com/jbenet/go-ipfs/routing/dht/pb"
kb "github.com/jbenet/go-ipfs/routing/kbucket" kb "github.com/jbenet/go-ipfs/routing/kbucket"
@ -26,6 +27,8 @@ import (
var log = eventlog.Logger("dht") var log = eventlog.Logger("dht")
var ProtocolDHT protocol.ID = "/ipfs/dht"
const doPinging = false const doPinging = false
// NumBootstrapQueries defines the number of random dht queries to do to // NumBootstrapQueries defines the number of random dht queries to do to
@ -37,7 +40,7 @@ const NumBootstrapQueries = 5
// IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications. // IpfsDHT is an implementation of Kademlia with Coral and S/Kademlia modifications.
// It is used to implement the base IpfsRouting module. // It is used to implement the base IpfsRouting module.
type IpfsDHT struct { type IpfsDHT struct {
network inet.Network // the network services we need host host.Host // the network services we need
self peer.ID // Local peer (yourself) self peer.ID // Local peer (yourself)
peerstore peer.Peerstore // Peer Registry peerstore peer.Peerstore // Peer Registry
@ -56,19 +59,19 @@ type IpfsDHT struct {
} }
// NewDHT creates a new DHT object with the given peer as the 'local' host // NewDHT creates a new DHT object with the given peer as the 'local' host
func NewDHT(ctx context.Context, p peer.ID, n inet.Network, dstore ds.ThreadSafeDatastore) *IpfsDHT { func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *IpfsDHT {
dht := new(IpfsDHT) dht := new(IpfsDHT)
dht.datastore = dstore dht.datastore = dstore
dht.self = p dht.self = h.ID()
dht.peerstore = n.Peerstore() dht.peerstore = h.Peerstore()
dht.ContextGroup = ctxgroup.WithContext(ctx) dht.ContextGroup = ctxgroup.WithContext(ctx)
dht.network = n dht.host = h
n.SetHandler(inet.ProtocolDHT, dht.handleNewStream) h.SetStreamHandler(ProtocolDHT, dht.handleNewStream)
dht.providers = NewProviderManager(dht.Context(), p) dht.providers = NewProviderManager(dht.Context(), dht.self)
dht.AddChildGroup(dht.providers) dht.AddChildGroup(dht.providers)
dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(p), time.Minute, dht.peerstore) dht.routingTable = kb.NewRoutingTable(20, kb.ConvertPeerID(dht.self), time.Minute, dht.peerstore)
dht.birth = time.Now() dht.birth = time.Now()
dht.Validators = make(map[string]ValidatorFunc) dht.Validators = make(map[string]ValidatorFunc)
@ -88,7 +91,8 @@ func (dht *IpfsDHT) LocalPeer() peer.ID {
// Connect to a new peer at the given address, ping and add to the routing table // Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
if err := dht.network.DialPeer(ctx, npeer); err != nil { // TODO: change interface to accept a PeerInfo as well.
if err := dht.host.Connect(ctx, peer.PeerInfo{ID: npeer}); err != nil {
return err return err
} }
@ -127,7 +131,7 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, key string) erro
// add self as the provider // add self as the provider
pi := dht.peerstore.PeerInfo(dht.self) pi := dht.peerstore.PeerInfo(dht.self)
pmes.ProviderPeers = pb.PeerInfosToPBPeers(dht.network, []peer.PeerInfo{pi}) pmes.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), []peer.PeerInfo{pi})
err := dht.sendMessage(ctx, p, pmes) err := dht.sendMessage(ctx, p, pmes)
if err != nil { if err != nil {
@ -304,7 +308,7 @@ func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, p peer.ID) error
} }
// dial connection // dial connection
return dht.network.DialPeer(ctx, p) return dht.host.Connect(ctx, peer.PeerInfo{ID: p})
} }
// PingRoutine periodically pings nearest neighbors. // PingRoutine periodically pings nearest neighbors.

View File

@ -74,7 +74,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
log.Debugf("%s dht starting stream", dht.self) log.Debugf("%s dht starting stream", dht.self)
s, err := dht.network.NewStream(inet.ProtocolDHT, p) s, err := dht.host.NewStream(ProtocolDHT, p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -116,7 +116,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
log.Debugf("%s dht starting stream", dht.self) log.Debugf("%s dht starting stream", dht.self)
s, err := dht.network.NewStream(inet.ProtocolDHT, p) s, err := dht.host.NewStream(ProtocolDHT, p)
if err != nil { if err != nil {
return err return err
} }

View File

@ -14,11 +14,10 @@ import (
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
swarmnet "github.com/jbenet/go-ipfs/p2p/net/swarmnet"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
netutil "github.com/jbenet/go-ipfs/p2p/test/util"
routing "github.com/jbenet/go-ipfs/routing" routing "github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util" u "github.com/jbenet/go-ipfs/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
) )
var testCaseValues = map[u.Key][]byte{} var testCaseValues = map[u.Key][]byte{}
@ -32,30 +31,11 @@ func init() {
} }
} }
func setupDHT(ctx context.Context, t *testing.T, addr ma.Multiaddr) *IpfsDHT { func setupDHT(ctx context.Context, t *testing.T) *IpfsDHT {
h := netutil.GenHostSwarm(t, ctx)
sk, pk, err := testutil.SeededKeyPair(time.Now().UnixNano())
if err != nil {
t.Fatal(err)
}
p, err := peer.IDFromPublicKey(pk)
if err != nil {
t.Fatal(err)
}
peerstore := peer.NewPeerstore()
peerstore.AddPrivKey(p, sk)
peerstore.AddPubKey(p, pk)
peerstore.AddAddress(p, addr)
n, err := swarmnet.NewNetwork(ctx, []ma.Multiaddr{addr}, p, peerstore)
if err != nil {
t.Fatal(err)
}
dss := dssync.MutexWrap(ds.NewMapDatastore()) dss := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, p, n, dss) d := NewDHT(ctx, h, dss)
d.Validators["v"] = func(u.Key, []byte) error { d.Validators["v"] = func(u.Key, []byte) error {
return nil return nil
@ -69,9 +49,9 @@ func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer
peers := make([]peer.ID, n) peers := make([]peer.ID, n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
addrs[i] = testutil.RandLocalTCPAddress() dhts[i] = setupDHT(ctx, t)
dhts[i] = setupDHT(ctx, t, addrs[i])
peers[i] = dhts[i].self peers[i] = dhts[i].self
addrs[i] = dhts[i].peerstore.Addresses(dhts[i].self)[0]
} }
return addrs, peers, dhts return addrs, peers, dhts
@ -116,19 +96,16 @@ func TestPing(t *testing.T) {
// t.Skip("skipping test to debug another") // t.Skip("skipping test to debug another")
ctx := context.Background() ctx := context.Background()
addrA := testutil.RandLocalTCPAddress() dhtA := setupDHT(ctx, t)
addrB := testutil.RandLocalTCPAddress() dhtB := setupDHT(ctx, t)
dhtA := setupDHT(ctx, t, addrA)
dhtB := setupDHT(ctx, t, addrB)
peerA := dhtA.self peerA := dhtA.self
peerB := dhtB.self peerB := dhtB.self
defer dhtA.Close() defer dhtA.Close()
defer dhtB.Close() defer dhtB.Close()
defer dhtA.network.Close() defer dhtA.host.Close()
defer dhtB.network.Close() defer dhtB.host.Close()
connect(t, ctx, dhtA, dhtB) connect(t, ctx, dhtA, dhtB)
@ -149,16 +126,13 @@ func TestValueGetSet(t *testing.T) {
ctx := context.Background() ctx := context.Background()
addrA := testutil.RandLocalTCPAddress() dhtA := setupDHT(ctx, t)
addrB := testutil.RandLocalTCPAddress() dhtB := setupDHT(ctx, t)
dhtA := setupDHT(ctx, t, addrA)
dhtB := setupDHT(ctx, t, addrB)
defer dhtA.Close() defer dhtA.Close()
defer dhtB.Close() defer dhtB.Close()
defer dhtA.network.Close() defer dhtA.host.Close()
defer dhtB.network.Close() defer dhtB.host.Close()
vf := func(u.Key, []byte) error { vf := func(u.Key, []byte) error {
return nil return nil
@ -200,7 +174,7 @@ func TestProvides(t *testing.T) {
defer func() { defer func() {
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
dhts[i].Close() dhts[i].Close()
defer dhts[i].network.Close() defer dhts[i].host.Close()
} }
}() }()
@ -268,7 +242,7 @@ func TestBootstrap(t *testing.T) {
defer func() { defer func() {
for i := 0; i < nDHTs; i++ { for i := 0; i < nDHTs; i++ {
dhts[i].Close() dhts[i].Close()
defer dhts[i].network.Close() defer dhts[i].host.Close()
} }
}() }()
@ -312,7 +286,7 @@ func TestProvidesMany(t *testing.T) {
defer func() { defer func() {
for i := 0; i < nDHTs; i++ { for i := 0; i < nDHTs; i++ {
dhts[i].Close() dhts[i].Close()
defer dhts[i].network.Close() defer dhts[i].host.Close()
} }
}() }()
@ -424,7 +398,7 @@ func TestProvidesAsync(t *testing.T) {
defer func() { defer func() {
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
dhts[i].Close() dhts[i].Close()
defer dhts[i].network.Close() defer dhts[i].host.Close()
} }
}() }()
@ -478,7 +452,7 @@ func TestLayeredGet(t *testing.T) {
defer func() { defer func() {
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
dhts[i].Close() dhts[i].Close()
defer dhts[i].network.Close() defer dhts[i].host.Close()
} }
}() }()
@ -518,7 +492,7 @@ func TestFindPeer(t *testing.T) {
defer func() { defer func() {
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
dhts[i].Close() dhts[i].Close()
dhts[i].network.Close() dhts[i].host.Close()
} }
}() }()
@ -554,7 +528,7 @@ func TestFindPeersConnectedToPeer(t *testing.T) {
defer func() { defer func() {
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
dhts[i].Close() dhts[i].Close()
dhts[i].network.Close() dhts[i].host.Close()
} }
}() }()
@ -633,11 +607,11 @@ func TestConnectCollision(t *testing.T) {
ctx := context.Background() ctx := context.Background()
addrA := testutil.RandLocalTCPAddress() dhtA := setupDHT(ctx, t)
addrB := testutil.RandLocalTCPAddress() dhtB := setupDHT(ctx, t)
dhtA := setupDHT(ctx, t, addrA) addrA := dhtA.peerstore.Addresses(dhtA.self)[0]
dhtB := setupDHT(ctx, t, addrB) addrB := dhtB.peerstore.Addresses(dhtB.self)[0]
peerA := dhtA.self peerA := dhtA.self
peerB := dhtB.self peerB := dhtB.self
@ -674,7 +648,7 @@ func TestConnectCollision(t *testing.T) {
dhtA.Close() dhtA.Close()
dhtB.Close() dhtB.Close()
dhtA.network.Close() dhtA.host.Close()
dhtB.network.Close() dhtB.host.Close()
} }
} }

View File

@ -1,6 +1,8 @@
package dht package dht
import ( import (
"io"
"io/ioutil"
"math/rand" "math/rand"
"testing" "testing"
@ -29,13 +31,20 @@ func TestGetFailures(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
nets := mn.Nets() hosts := mn.Hosts()
peers := mn.Peers() peers := mn.Peers()
tsds := dssync.MutexWrap(ds.NewMapDatastore()) tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, peers[0], nets[0], tsds) d := NewDHT(ctx, hosts[0], tsds)
d.Update(ctx, peers[1]) d.Update(ctx, peers[1])
// u.POut("NotFound Test\n")
// Reply with failures to every message
hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
defer s.Close()
io.Copy(ioutil.Discard, s)
})
// This one should time out // This one should time out
// u.POut("Timout Test\n") // u.POut("Timout Test\n")
ctx1, _ := context.WithTimeout(context.Background(), time.Second) ctx1, _ := context.WithTimeout(context.Background(), time.Second)
@ -50,7 +59,7 @@ func TestGetFailures(t *testing.T) {
t.Log("Timeout test passed.") t.Log("Timeout test passed.")
// Reply with failures to every message // Reply with failures to every message
nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) { hosts[1].SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
defer s.Close() defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
@ -97,7 +106,7 @@ func TestGetFailures(t *testing.T) {
} }
// u.POut("handleGetValue Test\n") // u.POut("handleGetValue Test\n")
s, err := nets[1].NewStream(inet.ProtocolDHT, peers[0]) s, err := hosts[1].NewStream(ProtocolDHT, hosts[0].ID())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -133,19 +142,19 @@ func TestNotFound(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
nets := mn.Nets() hosts := mn.Hosts()
peers := mn.Peers() peers := mn.Peers()
tsds := dssync.MutexWrap(ds.NewMapDatastore()) tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, peers[0], nets[0], tsds) d := NewDHT(ctx, hosts[0], tsds)
for _, p := range peers { for _, p := range peers {
d.Update(ctx, p) d.Update(ctx, p)
} }
// Reply with random peers to every message // Reply with random peers to every message
for _, neti := range nets { for _, host := range hosts {
neti := neti // shadow loop var host := host // shadow loop var
neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) { host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
defer s.Close() defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
@ -163,11 +172,11 @@ func TestNotFound(t *testing.T) {
ps := []peer.PeerInfo{} ps := []peer.PeerInfo{}
for i := 0; i < 7; i++ { for i := 0; i < 7; i++ {
p := peers[rand.Intn(len(peers))] p := peers[rand.Intn(len(peers))]
pi := neti.Peerstore().PeerInfo(p) pi := host.Peerstore().PeerInfo(p)
ps = append(ps, pi) ps = append(ps, pi)
} }
resp.CloserPeers = pb.PeerInfosToPBPeers(d.network, ps) resp.CloserPeers = pb.PeerInfosToPBPeers(d.host.Network(), ps)
if err := pbw.WriteMsg(resp); err != nil { if err := pbw.WriteMsg(resp); err != nil {
panic(err) panic(err)
} }
@ -205,20 +214,20 @@ func TestLessThanKResponses(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
nets := mn.Nets() hosts := mn.Hosts()
peers := mn.Peers() peers := mn.Peers()
tsds := dssync.MutexWrap(ds.NewMapDatastore()) tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, peers[0], nets[0], tsds) d := NewDHT(ctx, hosts[0], tsds)
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
d.Update(ctx, peers[i]) d.Update(ctx, peers[i])
} }
// Reply with random peers to every message // Reply with random peers to every message
for _, neti := range nets { for _, host := range hosts {
neti := neti // shadow loop var host := host // shadow loop var
neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) { host.SetStreamHandler(ProtocolDHT, func(s inet.Stream) {
defer s.Close() defer s.Close()
pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
@ -231,10 +240,10 @@ func TestLessThanKResponses(t *testing.T) {
switch pmes.GetType() { switch pmes.GetType() {
case pb.Message_GET_VALUE: case pb.Message_GET_VALUE:
pi := neti.Peerstore().PeerInfo(peers[1]) pi := host.Peerstore().PeerInfo(peers[1])
resp := &pb.Message{ resp := &pb.Message{
Type: pmes.Type, Type: pmes.Type,
CloserPeers: pb.PeerInfosToPBPeers(d.network, []peer.PeerInfo{pi}), CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.PeerInfo{pi}),
} }
if err := pbw.WriteMsg(resp); err != nil { if err := pbw.WriteMsg(resp); err != nil {

View File

@ -89,7 +89,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
provinfos := peer.PeerInfos(dht.peerstore, provs) provinfos := peer.PeerInfos(dht.peerstore, provs)
if len(provs) > 0 { if len(provs) > 0 {
log.Debugf("handleGetValue returning %d provider[s]", len(provs)) log.Debugf("handleGetValue returning %d provider[s]", len(provs))
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.network, provinfos) resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), provinfos)
} }
// Find closest peer on given cluster to desired key and reply with that info // Find closest peer on given cluster to desired key and reply with that info
@ -106,7 +106,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess
} }
} }
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, closerinfos) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), closerinfos)
} }
return resp, nil return resp, nil
@ -161,7 +161,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
} }
} }
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, withAddresses) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), withAddresses)
return resp, nil return resp, nil
} }
@ -185,14 +185,14 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
if providers != nil && len(providers) > 0 { if providers != nil && len(providers) > 0 {
infos := peer.PeerInfos(dht.peerstore, providers) infos := peer.PeerInfos(dht.peerstore, providers)
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.network, infos) resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
} }
// Also send closer peers. // Also send closer peers.
closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount) closer := dht.betterPeersToQuery(pmes, p, CloserPeerCount)
if closer != nil { if closer != nil {
infos := peer.PeerInfos(dht.peerstore, providers) infos := peer.PeerInfos(dht.peerstore, providers)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.network, infos) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
} }
return resp, nil return resp, nil

View File

@ -3,7 +3,6 @@ package dht
import ( import (
"sync" "sync"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer" peer "github.com/jbenet/go-ipfs/p2p/peer"
queue "github.com/jbenet/go-ipfs/p2p/peer/queue" queue "github.com/jbenet/go-ipfs/p2p/peer/queue"
"github.com/jbenet/go-ipfs/routing" "github.com/jbenet/go-ipfs/routing"
@ -18,17 +17,10 @@ import (
var maxQueryConcurrency = AlphaValue var maxQueryConcurrency = AlphaValue
type dhtQuery struct { type dhtQuery struct {
// the key we're querying for dht *IpfsDHT
key u.Key key u.Key // the key we're querying for
qfunc queryFunc // the function to execute per peer
// dialer used to ensure we're connected to peers concurrency int // the concurrency parameter
dialer inet.Dialer
// the function to execute per peer
qfunc queryFunc
// the concurrency parameter
concurrency int
} }
type dhtQueryResult struct { type dhtQueryResult struct {
@ -40,10 +32,10 @@ type dhtQueryResult struct {
} }
// constructs query // constructs query
func newQuery(k u.Key, d inet.Dialer, f queryFunc) *dhtQuery { func (dht *IpfsDHT) newQuery(k u.Key, f queryFunc) *dhtQuery {
return &dhtQuery{ return &dhtQuery{
key: k, key: k,
dialer: d, dht: dht,
qfunc: f, qfunc: f,
concurrency: maxQueryConcurrency, concurrency: maxQueryConcurrency,
} }
@ -155,7 +147,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) { func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
// if new peer is ourselves... // if new peer is ourselves...
if next == r.query.dialer.LocalPeer() { if next == r.query.dht.self {
return return
} }
@ -222,10 +214,11 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
}() }()
// make sure we're connected to the peer. // make sure we're connected to the peer.
if conns := r.query.dialer.ConnsToPeer(p); len(conns) == 0 { if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
log.Infof("worker for: %v -- not connected. dial start", p) log.Infof("worker for: %v -- not connected. dial start", p)
if err := r.query.dialer.DialPeer(cg.Context(), p); err != nil { pi := peer.PeerInfo{ID: p}
if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil {
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err) log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
r.Lock() r.Lock()
r.errs = append(r.errs, err) r.errs = append(r.errs, err)
@ -257,12 +250,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers)) log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
for _, next := range res.closerPeers { for _, next := range res.closerPeers {
// add their addresses to the dialer's peerstore // add their addresses to the dialer's peerstore
conns := r.query.dialer.ConnsToPeer(next.ID) r.query.dht.peerstore.AddPeerInfo(next)
if len(conns) == 0 {
log.Infof("PEERS CLOSER -- worker for %v FOUND NEW PEER: %s %s", p, next.ID, next.Addrs)
}
r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs)
r.addPeerToQuery(cg.Context(), next.ID) r.addPeerToQuery(cg.Context(), next.ID)
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs) log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
} }

View File

@ -82,7 +82,7 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
} }
// setup the Query // setup the Query
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
val, peers, err := dht.getValueOrPeers(ctx, p, key) val, peers, err := dht.getValueOrPeers(ctx, p, key)
if err != nil { if err != nil {
@ -170,7 +170,7 @@ func (dht *IpfsDHT) getClosestPeers(ctx context.Context, key u.Key) (<-chan peer
peerset.Add(p) peerset.Add(p)
} }
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
closer, err := dht.closerPeersSingle(ctx, key, p) closer, err := dht.closerPeersSingle(ctx, key, p)
if err != nil { if err != nil {
log.Errorf("error getting closer peers: %s", err) log.Errorf("error getting closer peers: %s", err)
@ -253,7 +253,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
} }
// setup the Query // setup the Query
query := newQuery(key, dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
pmes, err := dht.findProvidersSingle(ctx, p, key) pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil { if err != nil {
@ -312,7 +312,7 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (peer.PeerInfo, er
} }
// setup the Query // setup the Query
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { query := dht.newQuery(u.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id) pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil { if err != nil {
@ -361,7 +361,7 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (<
} }
// setup the Query // setup the Query
query := newQuery(u.Key(id), dht.network, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { query := dht.newQuery(u.Key(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
pmes, err := dht.findPeerSingle(ctx, p, id) pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil { if err != nil {