mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-06-25 18:51:38 +00:00
fix(net) pass contexts to dial peer
This commit is contained in:
10
dht.go
10
dht.go
@ -100,7 +100,7 @@ func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) (peer.Peer, er
|
|||||||
//
|
//
|
||||||
// /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
|
// /ip4/10.20.30.40/tcp/1234/ipfs/Qxhxxchxzcncxnzcnxzcxzm
|
||||||
//
|
//
|
||||||
err := dht.dialer.DialPeer(npeer)
|
err := dht.dialer.DialPeer(ctx, npeer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -311,7 +311,7 @@ func (dht *IpfsDHT) getFromPeerList(ctx context.Context, key u.Key,
|
|||||||
peerlist []*pb.Message_Peer, level int) ([]byte, error) {
|
peerlist []*pb.Message_Peer, level int) ([]byte, error) {
|
||||||
|
|
||||||
for _, pinfo := range peerlist {
|
for _, pinfo := range peerlist {
|
||||||
p, err := dht.ensureConnectedToPeer(pinfo)
|
p, err := dht.ensureConnectedToPeer(ctx, pinfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("getFromPeers error: %s", err)
|
log.Errorf("getFromPeers error: %s", err)
|
||||||
continue
|
continue
|
||||||
@ -496,14 +496,14 @@ func (dht *IpfsDHT) peerFromInfo(pbp *pb.Message_Peer) (peer.Peer, error) {
|
|||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) ensureConnectedToPeer(pbp *pb.Message_Peer) (peer.Peer, error) {
|
func (dht *IpfsDHT) ensureConnectedToPeer(ctx context.Context, pbp *pb.Message_Peer) (peer.Peer, error) {
|
||||||
p, err := dht.peerFromInfo(pbp)
|
p, err := dht.peerFromInfo(pbp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// dial connection
|
// dial connection
|
||||||
err = dht.dialer.DialPeer(p)
|
err = dht.dialer.DialPeer(ctx, p)
|
||||||
return p, err
|
return p, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -556,7 +556,7 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("Bootstrap peer error: %s", err)
|
log.Error("Bootstrap peer error: %s", err)
|
||||||
}
|
}
|
||||||
err = dht.dialer.DialPeer(p)
|
err = dht.dialer.DialPeer(ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("Bootstrap peer error: %s", err)
|
log.Errorf("Bootstrap peer error: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -4,7 +4,6 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
crand "crypto/rand"
|
crand "crypto/rand"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||||
|
|
||||||
@ -82,7 +81,7 @@ type fauxNet struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// DialPeer attempts to establish a connection to a given peer
|
// DialPeer attempts to establish a connection to a given peer
|
||||||
func (f *fauxNet) DialPeer(peer.Peer) error {
|
func (f *fauxNet) DialPeer(context.Context, peer.Peer) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
2
query.go
2
query.go
@ -230,7 +230,7 @@ func (r *dhtQueryRunner) queryPeer(p peer.Peer) {
|
|||||||
|
|
||||||
// make sure we're connected to the peer.
|
// make sure we're connected to the peer.
|
||||||
// (Incidentally, this will add it to the peerstore too)
|
// (Incidentally, this will add it to the peerstore too)
|
||||||
err := r.query.dialer.DialPeer(p)
|
err := r.query.dialer.DialPeer(r.ctx, p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
|
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
|
||||||
r.Lock()
|
r.Lock()
|
||||||
|
@ -145,7 +145,7 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
|
|||||||
log.Error(err)
|
log.Error(err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
dht.addPeerListAsync(key, pmes.GetProviderPeers(), ps, count, peerOut)
|
dht.addPeerListAsync(ctx, key, pmes.GetProviderPeers(), ps, count, peerOut)
|
||||||
}(pp)
|
}(pp)
|
||||||
}
|
}
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -154,13 +154,13 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
|
|||||||
return peerOut
|
return peerOut
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) addPeerListAsync(k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
|
func (dht *IpfsDHT) addPeerListAsync(ctx context.Context, k u.Key, peers []*pb.Message_Peer, ps *peerSet, count int, out chan peer.Peer) {
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
for _, pbp := range peers {
|
for _, pbp := range peers {
|
||||||
go func(mp *pb.Message_Peer) {
|
go func(mp *pb.Message_Peer) {
|
||||||
defer func() { done <- struct{}{} }()
|
defer func() { done <- struct{}{} }()
|
||||||
// construct new peer
|
// construct new peer
|
||||||
p, err := dht.ensureConnectedToPeer(mp)
|
p, err := dht.ensureConnectedToPeer(ctx, mp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("%s", err)
|
log.Error("%s", err)
|
||||||
return
|
return
|
||||||
|
Reference in New Issue
Block a user