mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
thread context to prep (#286)
We usually _explicitly_ call connect before calling `prep` but we may disconnect somewhere in-between. We _don't_ want to get stuck here dialing if the context has been canceled.
This commit is contained in:
parent
ae60aa183b
commit
6eafd65baa
20
dht_net.go
20
dht_net.go
@ -111,7 +111,7 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) bool {
|
||||
// measure the RTT for latency measurements.
|
||||
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
|
||||
|
||||
ms, err := dht.messageSenderForPeer(p)
|
||||
ms, err := dht.messageSenderForPeer(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -133,7 +133,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
|
||||
|
||||
// sendMessage sends out a message
|
||||
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
|
||||
ms, err := dht.messageSenderForPeer(p)
|
||||
ms, err := dht.messageSenderForPeer(ctx, p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -154,7 +154,7 @@ func (dht *IpfsDHT) updateFromMessage(ctx context.Context, p peer.ID, mes *pb.Me
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
|
||||
func (dht *IpfsDHT) messageSenderForPeer(ctx context.Context, p peer.ID) (*messageSender, error) {
|
||||
dht.smlk.Lock()
|
||||
ms, ok := dht.strmap[p]
|
||||
if ok {
|
||||
@ -165,7 +165,7 @@ func (dht *IpfsDHT) messageSenderForPeer(p peer.ID) (*messageSender, error) {
|
||||
dht.strmap[p] = ms
|
||||
dht.smlk.Unlock()
|
||||
|
||||
if err := ms.prepOrInvalidate(); err != nil {
|
||||
if err := ms.prepOrInvalidate(ctx); err != nil {
|
||||
dht.smlk.Lock()
|
||||
defer dht.smlk.Unlock()
|
||||
|
||||
@ -209,17 +209,17 @@ func (ms *messageSender) invalidate() {
|
||||
}
|
||||
}
|
||||
|
||||
func (ms *messageSender) prepOrInvalidate() error {
|
||||
func (ms *messageSender) prepOrInvalidate(ctx context.Context) error {
|
||||
ms.lk.Lock()
|
||||
defer ms.lk.Unlock()
|
||||
if err := ms.prep(); err != nil {
|
||||
if err := ms.prep(ctx); err != nil {
|
||||
ms.invalidate()
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ms *messageSender) prep() error {
|
||||
func (ms *messageSender) prep(ctx context.Context) error {
|
||||
if ms.invalid {
|
||||
return fmt.Errorf("message sender has been invalidated")
|
||||
}
|
||||
@ -227,7 +227,7 @@ func (ms *messageSender) prep() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
nstr, err := ms.dht.host.NewStream(ms.dht.ctx, ms.p, ms.dht.protocols...)
|
||||
nstr, err := ms.dht.host.NewStream(ctx, ms.p, ms.dht.protocols...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -249,7 +249,7 @@ func (ms *messageSender) SendMessage(ctx context.Context, pmes *pb.Message) erro
|
||||
defer ms.lk.Unlock()
|
||||
retry := false
|
||||
for {
|
||||
if err := ms.prep(); err != nil {
|
||||
if err := ms.prep(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -285,7 +285,7 @@ func (ms *messageSender) SendRequest(ctx context.Context, pmes *pb.Message) (*pb
|
||||
defer ms.lk.Unlock()
|
||||
retry := false
|
||||
for {
|
||||
if err := ms.prep(); err != nil {
|
||||
if err := ms.prep(ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -466,7 +466,7 @@ func TestInvalidMessageSenderTracking(t *testing.T) {
|
||||
defer dht.Close()
|
||||
|
||||
foo := peer.ID("asdasd")
|
||||
_, err := dht.messageSenderForPeer(foo)
|
||||
_, err := dht.messageSenderForPeer(ctx, foo)
|
||||
if err == nil {
|
||||
t.Fatal("that shouldnt have succeeded")
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user