mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-06-27 03:31:39 +00:00
dht/query: fix important panic
Withe queries (particularly providers), it was possible to
exit the query runner's Run BEFORE all its children were done,
because the runner itself only listened to the context. This
introduced the possibility of a panic (you can go check it
out by running the TestProvidersMany test on dht_test in commits
before this one). Thankfully, ctxgroup saved the day with
almost _zero_ changes to the sync flow, and now we have the
guarantee that the query runner will only exit if all its
children are done. ❤️
Conflicts:
routing/dht/query.go
This commit is contained in:
16
dht_net.go
16
dht_net.go
@ -7,6 +7,7 @@ import (
|
|||||||
inet "github.com/jbenet/go-ipfs/net"
|
inet "github.com/jbenet/go-ipfs/net"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
pb "github.com/jbenet/go-ipfs/routing/dht/pb"
|
||||||
|
ctxutil "github.com/jbenet/go-ipfs/util/ctx"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
ggio "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/gogoprotobuf/io"
|
||||||
@ -21,8 +22,10 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
|
|||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
ctx := dht.Context()
|
ctx := dht.Context()
|
||||||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
|
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
|
||||||
w := ggio.NewDelimitedWriter(s)
|
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
|
||||||
|
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
|
||||||
|
w := ggio.NewDelimitedWriter(cw)
|
||||||
mPeer := s.Conn().RemotePeer()
|
mPeer := s.Conn().RemotePeer()
|
||||||
|
|
||||||
// receive msg
|
// receive msg
|
||||||
@ -76,8 +79,10 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
|
|||||||
}
|
}
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
r := ggio.NewDelimitedReader(s, inet.MessageSizeMax)
|
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
|
||||||
w := ggio.NewDelimitedWriter(s)
|
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
|
||||||
|
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
|
||||||
|
w := ggio.NewDelimitedWriter(cw)
|
||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
@ -113,7 +118,8 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
|
|||||||
}
|
}
|
||||||
defer s.Close()
|
defer s.Close()
|
||||||
|
|
||||||
w := ggio.NewDelimitedWriter(s)
|
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
|
||||||
|
w := ggio.NewDelimitedWriter(cw)
|
||||||
|
|
||||||
log.Debugf("%s writing", dht.self)
|
log.Debugf("%s writing", dht.self)
|
||||||
if err := w.WriteMsg(pmes); err != nil {
|
if err := w.WriteMsg(pmes); err != nil {
|
||||||
|
58
query.go
58
query.go
@ -12,6 +12,7 @@ import (
|
|||||||
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
|
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
ctxgroup "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-ctxgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
var maxQueryConcurrency = AlphaValue
|
var maxQueryConcurrency = AlphaValue
|
||||||
@ -78,9 +79,8 @@ type dhtQueryRunner struct {
|
|||||||
// peersRemaining is a counter of peers remaining (toQuery + processing)
|
// peersRemaining is a counter of peers remaining (toQuery + processing)
|
||||||
peersRemaining todoctr.Counter
|
peersRemaining todoctr.Counter
|
||||||
|
|
||||||
// context
|
// context group
|
||||||
ctx context.Context
|
cg ctxgroup.ContextGroup
|
||||||
cancel context.CancelFunc
|
|
||||||
|
|
||||||
// result
|
// result
|
||||||
result *dhtQueryResult
|
result *dhtQueryResult
|
||||||
@ -93,16 +93,13 @@ type dhtQueryRunner struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
|
func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
|
||||||
|
|
||||||
return &dhtQueryRunner{
|
return &dhtQueryRunner{
|
||||||
ctx: ctx,
|
|
||||||
cancel: cancel,
|
|
||||||
query: q,
|
query: q,
|
||||||
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
|
peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)),
|
||||||
peersRemaining: todoctr.NewSyncCounter(),
|
peersRemaining: todoctr.NewSyncCounter(),
|
||||||
peersSeen: peer.Set{},
|
peersSeen: peer.Set{},
|
||||||
rateLimit: make(chan struct{}, q.concurrency),
|
rateLimit: make(chan struct{}, q.concurrency),
|
||||||
|
cg: ctxgroup.WithContext(ctx),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -120,11 +117,13 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
|||||||
|
|
||||||
// add all the peers we got first.
|
// add all the peers we got first.
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
r.addPeerToQuery(p, "") // don't have access to self here...
|
r.addPeerToQuery(r.cg.Context(), p, "") // don't have access to self here...
|
||||||
}
|
}
|
||||||
|
|
||||||
// go do this thing.
|
// go do this thing.
|
||||||
go r.spawnWorkers()
|
// do it as a child func to make sure Run exits
|
||||||
|
// ONLY AFTER spawn workers has exited.
|
||||||
|
r.cg.AddChildFunc(r.spawnWorkers)
|
||||||
|
|
||||||
// so workers are working.
|
// so workers are working.
|
||||||
|
|
||||||
@ -133,7 +132,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-r.peersRemaining.Done():
|
case <-r.peersRemaining.Done():
|
||||||
r.cancel() // ran all and nothing. cancel all outstanding workers.
|
r.cg.Close()
|
||||||
r.RLock()
|
r.RLock()
|
||||||
defer r.RUnlock()
|
defer r.RUnlock()
|
||||||
|
|
||||||
@ -141,10 +140,10 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
|||||||
err = r.errs[0]
|
err = r.errs[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-r.ctx.Done():
|
case <-r.cg.Closed():
|
||||||
r.RLock()
|
r.RLock()
|
||||||
defer r.RUnlock()
|
defer r.RUnlock()
|
||||||
err = r.ctx.Err()
|
err = r.cg.Context().Err() // collect the error.
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.result != nil && r.result.success {
|
if r.result != nil && r.result.success {
|
||||||
@ -154,7 +153,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *dhtQueryRunner) addPeerToQuery(next peer.ID, benchmark peer.ID) {
|
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID, benchmark peer.ID) {
|
||||||
// if new peer is ourselves...
|
// if new peer is ourselves...
|
||||||
if next == r.query.dialer.LocalPeer() {
|
if next == r.query.dialer.LocalPeer() {
|
||||||
return
|
return
|
||||||
@ -186,37 +185,42 @@ func (r *dhtQueryRunner) addPeerToQuery(next peer.ID, benchmark peer.ID) {
|
|||||||
r.peersRemaining.Increment(1)
|
r.peersRemaining.Increment(1)
|
||||||
select {
|
select {
|
||||||
case r.peersToQuery.EnqChan <- next:
|
case r.peersToQuery.EnqChan <- next:
|
||||||
case <-r.ctx.Done():
|
case <-ctx.Done():
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *dhtQueryRunner) spawnWorkers() {
|
func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
|
||||||
for {
|
for {
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-r.peersRemaining.Done():
|
case <-r.peersRemaining.Done():
|
||||||
return
|
return
|
||||||
|
|
||||||
case <-r.ctx.Done():
|
case <-r.cg.Closing():
|
||||||
return
|
return
|
||||||
|
|
||||||
case p, more := <-r.peersToQuery.DeqChan:
|
case p, more := <-r.peersToQuery.DeqChan:
|
||||||
if !more {
|
if !more {
|
||||||
return // channel closed.
|
return // channel closed.
|
||||||
}
|
}
|
||||||
log.Debugf("spawning worker for: %v\n", p)
|
log.Debugf("spawning worker for: %v", p)
|
||||||
go r.queryPeer(p)
|
|
||||||
|
// do it as a child func to make sure Run exits
|
||||||
|
// ONLY AFTER spawn workers has exited.
|
||||||
|
parent.AddChildFunc(func(cg ctxgroup.ContextGroup) {
|
||||||
|
r.queryPeer(cg, p)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *dhtQueryRunner) queryPeer(p peer.ID) {
|
func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
|
||||||
log.Debugf("spawned worker for: %v", p)
|
log.Debugf("spawned worker for: %v", p)
|
||||||
|
|
||||||
// make sure we rate limit concurrency.
|
// make sure we rate limit concurrency.
|
||||||
select {
|
select {
|
||||||
case <-r.rateLimit:
|
case <-r.rateLimit:
|
||||||
case <-r.ctx.Done():
|
case <-cg.Closing():
|
||||||
r.peersRemaining.Decrement(1)
|
r.peersRemaining.Decrement(1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -233,7 +237,7 @@ func (r *dhtQueryRunner) queryPeer(p peer.ID) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// make sure we're connected to the peer.
|
// make sure we're connected to the peer.
|
||||||
err := r.query.dialer.DialPeer(r.ctx, p)
|
err := r.query.dialer.DialPeer(cg.Context(), p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
|
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
|
||||||
r.Lock()
|
r.Lock()
|
||||||
@ -243,7 +247,7 @@ func (r *dhtQueryRunner) queryPeer(p peer.ID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// finally, run the query against this peer
|
// finally, run the query against this peer
|
||||||
res, err := r.query.qfunc(r.ctx, p)
|
res, err := r.query.qfunc(cg.Context(), p)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("ERROR worker for: %v %v", p, err)
|
log.Debugf("ERROR worker for: %v %v", p, err)
|
||||||
@ -256,14 +260,20 @@ func (r *dhtQueryRunner) queryPeer(p peer.ID) {
|
|||||||
r.Lock()
|
r.Lock()
|
||||||
r.result = res
|
r.result = res
|
||||||
r.Unlock()
|
r.Unlock()
|
||||||
r.cancel() // signal to everyone that we're done.
|
go r.cg.Close() // signal to everyone that we're done.
|
||||||
|
// must be async, as we're one of the children, and Close blocks.
|
||||||
|
|
||||||
} else if len(res.closerPeers) > 0 {
|
} else if len(res.closerPeers) > 0 {
|
||||||
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
|
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
|
||||||
for _, next := range res.closerPeers {
|
for _, next := range res.closerPeers {
|
||||||
// add their addresses to the dialer's peerstore
|
// add their addresses to the dialer's peerstore
|
||||||
|
conns := r.query.dialer.ConnsToPeer(next.ID)
|
||||||
|
if len(conns) == 0 {
|
||||||
|
log.Infof("PEERS CLOSER -- worker for %v FOUND NEW PEER: %s %s", p, next.ID, next.Addrs)
|
||||||
|
}
|
||||||
|
|
||||||
r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs)
|
r.query.dialer.Peerstore().AddAddresses(next.ID, next.Addrs)
|
||||||
r.addPeerToQuery(next.ID, p)
|
r.addPeerToQuery(cg.Context(), next.ID, p)
|
||||||
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
|
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
Reference in New Issue
Block a user