mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 14:52:14 +00:00
park waiters in slice; revise closure logic.
This commit is contained in:
parent
bf4b91ce4b
commit
74d22f3f5e
@ -61,7 +61,7 @@ type waitingCh struct {
|
|||||||
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
|
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
|
||||||
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
|
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
|
||||||
// to DialQueueMaxParallelism.
|
// to DialQueueMaxParallelism.
|
||||||
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error, nConsumers int) *dialQueue {
|
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error) *dialQueue {
|
||||||
sq := &dialQueue{
|
sq := &dialQueue{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
dialFn: dialFn,
|
dialFn: dialFn,
|
||||||
@ -71,9 +71,9 @@ func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialF
|
|||||||
in: in,
|
in: in,
|
||||||
out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),
|
out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),
|
||||||
|
|
||||||
growCh: make(chan struct{}, nConsumers),
|
growCh: make(chan struct{}, 1),
|
||||||
shrinkCh: make(chan struct{}, 1),
|
shrinkCh: make(chan struct{}, 1),
|
||||||
waitingCh: make(chan waitingCh, nConsumers),
|
waitingCh: make(chan waitingCh),
|
||||||
dieCh: make(chan struct{}, DialQueueMaxParallelism),
|
dieCh: make(chan struct{}, DialQueueMaxParallelism),
|
||||||
}
|
}
|
||||||
for i := 0; i < DialQueueMinParallelism; i++ {
|
for i := 0; i < DialQueueMinParallelism; i++ {
|
||||||
@ -85,22 +85,16 @@ func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialF
|
|||||||
|
|
||||||
func (dq *dialQueue) control() {
|
func (dq *dialQueue) control() {
|
||||||
var (
|
var (
|
||||||
p peer.ID
|
dialled <-chan peer.ID
|
||||||
dialled = dq.out.DeqChan
|
waiting []waitingCh
|
||||||
resp waitingCh
|
|
||||||
waiting <-chan waitingCh
|
|
||||||
lastScalingEvt = time.Now()
|
lastScalingEvt = time.Now()
|
||||||
)
|
)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// close channels.
|
for _, w := range waiting {
|
||||||
if resp.ch != nil {
|
|
||||||
close(resp.ch)
|
|
||||||
}
|
|
||||||
close(dq.waitingCh)
|
|
||||||
for w := range dq.waitingCh {
|
|
||||||
close(w.ch)
|
close(w.ch)
|
||||||
}
|
}
|
||||||
|
waiting = nil
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
@ -109,16 +103,23 @@ func (dq *dialQueue) control() {
|
|||||||
select {
|
select {
|
||||||
case <-dq.ctx.Done():
|
case <-dq.ctx.Done():
|
||||||
return
|
return
|
||||||
case p = <-dialled:
|
case w := <-dq.waitingCh:
|
||||||
dialled, waiting = nil, dq.waitingCh
|
waiting = append(waiting, w)
|
||||||
|
dialled = dq.out.DeqChan
|
||||||
continue // onto the top.
|
continue // onto the top.
|
||||||
case resp = <-waiting:
|
case p, ok := <-dialled:
|
||||||
// got a channel that's waiting for a peer.
|
if !ok {
|
||||||
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
|
return // we're done if the ChanQueue is closed, which happens when the context is closed.
|
||||||
resp.ch <- p
|
}
|
||||||
close(resp.ch)
|
w := waiting[0]
|
||||||
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
|
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
|
||||||
resp.ch = nil
|
w.ch <- p
|
||||||
|
close(w.ch)
|
||||||
|
waiting = waiting[1:]
|
||||||
|
if len(waiting) == 0 {
|
||||||
|
// no more waiters, so stop consuming dialled jobs.
|
||||||
|
dialled = nil
|
||||||
|
}
|
||||||
continue // onto the top.
|
continue // onto the top.
|
||||||
default:
|
default:
|
||||||
// there's nothing to process, so proceed onto the main select block.
|
// there's nothing to process, so proceed onto the main select block.
|
||||||
@ -127,23 +128,30 @@ func (dq *dialQueue) control() {
|
|||||||
select {
|
select {
|
||||||
case <-dq.ctx.Done():
|
case <-dq.ctx.Done():
|
||||||
return
|
return
|
||||||
case p = <-dialled:
|
case w := <-dq.waitingCh:
|
||||||
dialled, waiting = nil, dq.waitingCh
|
waiting = append(waiting, w)
|
||||||
case resp = <-waiting:
|
dialled = dq.out.DeqChan
|
||||||
// got a channel that's waiting for a peer.
|
case p, ok := <-dialled:
|
||||||
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
|
if !ok {
|
||||||
resp.ch <- p
|
return // we're done if the ChanQueue is closed, which happens when the context is closed.
|
||||||
close(resp.ch)
|
}
|
||||||
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
|
w := waiting[0]
|
||||||
resp.ch = nil
|
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
|
||||||
|
w.ch <- p
|
||||||
|
close(w.ch)
|
||||||
|
waiting = waiting[1:]
|
||||||
|
if len(waiting) == 0 {
|
||||||
|
// no more waiters, so stop consuming dialled jobs.
|
||||||
|
dialled = nil
|
||||||
|
}
|
||||||
case <-dq.growCh:
|
case <-dq.growCh:
|
||||||
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
|
if time.Since(lastScalingEvt) < DialQueueScalingMutePeriod {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
dq.grow()
|
dq.grow()
|
||||||
lastScalingEvt = time.Now()
|
lastScalingEvt = time.Now()
|
||||||
case <-dq.shrinkCh:
|
case <-dq.shrinkCh:
|
||||||
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
|
if time.Since(lastScalingEvt) < DialQueueScalingMutePeriod {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
dq.shrink()
|
dq.shrink()
|
||||||
@ -176,13 +184,11 @@ func (dq *dialQueue) Consume() <-chan peer.ID {
|
|||||||
|
|
||||||
// park the channel until a dialled peer becomes available.
|
// park the channel until a dialled peer becomes available.
|
||||||
select {
|
select {
|
||||||
|
case dq.waitingCh <- waitingCh{ch, time.Now()}:
|
||||||
|
// all good
|
||||||
case <-dq.ctx.Done():
|
case <-dq.ctx.Done():
|
||||||
// return a closed channel with no value if we're done.
|
// return a closed channel with no value if we're done.
|
||||||
close(ch)
|
close(ch)
|
||||||
return ch
|
|
||||||
case dq.waitingCh <- waitingCh{ch, time.Now()}:
|
|
||||||
default:
|
|
||||||
panic("detected more consuming goroutines than declared upfront")
|
|
||||||
}
|
}
|
||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
@ -268,7 +274,7 @@ func (dq *dialQueue) worker() {
|
|||||||
log.Debugf("discarding dialled peer because of error: %v", err)
|
log.Debugf("discarding dialled peer because of error: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Now().Sub(t)/time.Millisecond)
|
log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
|
||||||
waiting := len(dq.waitingCh)
|
waiting := len(dq.waitingCh)
|
||||||
dq.out.EnqChan <- p
|
dq.out.EnqChan <- p
|
||||||
if waiting > 0 {
|
if waiting > 0 {
|
||||||
|
@ -16,31 +16,6 @@ func init() {
|
|||||||
DialQueueScalingMutePeriod = 0
|
DialQueueScalingMutePeriod = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDialQueueErrorsWithTooManyConsumers(t *testing.T) {
|
|
||||||
var calls int
|
|
||||||
defer func() {
|
|
||||||
if e := recover(); e == nil {
|
|
||||||
t.Error("expected a panic, got none")
|
|
||||||
} else if calls != 4 {
|
|
||||||
t.Errorf("expected a panic on the 4th call to Consume(); got it on call number %d", calls)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
|
|
||||||
hang := make(chan struct{})
|
|
||||||
dialFn := func(ctx context.Context, p peer.ID) error {
|
|
||||||
<-hang
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
dq := newDialQueue(context.Background(), "test", in, dialFn, 3)
|
|
||||||
for ; calls < 3; calls++ {
|
|
||||||
dq.Consume()
|
|
||||||
}
|
|
||||||
calls++
|
|
||||||
dq.Consume()
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDialQueueGrowsOnSlowDials(t *testing.T) {
|
func TestDialQueueGrowsOnSlowDials(t *testing.T) {
|
||||||
DialQueueMaxIdle = 10 * time.Minute
|
DialQueueMaxIdle = 10 * time.Minute
|
||||||
|
|
||||||
@ -60,7 +35,7 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// remove the mute period to grow faster.
|
// remove the mute period to grow faster.
|
||||||
dq := newDialQueue(context.Background(), "test", in, dialFn, 4)
|
dq := newDialQueue(context.Background(), "test", in, dialFn)
|
||||||
|
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
_ = dq.Consume()
|
_ = dq.Consume()
|
||||||
@ -93,7 +68,7 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
dq := newDialQueue(context.Background(), "test", in, dialFn, 3)
|
dq := newDialQueue(context.Background(), "test", in, dialFn)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
recover()
|
recover()
|
||||||
@ -160,7 +135,7 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
|
|||||||
in.EnqChan <- peer.ID(i)
|
in.EnqChan <- peer.ID(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
dq := newDialQueue(context.Background(), "test", in, dialFn, 3)
|
dq := newDialQueue(context.Background(), "test", in, dialFn)
|
||||||
|
|
||||||
// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
|
// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
|
||||||
for i := 0; i < 13; i++ {
|
for i := 0; i < 13; i++ {
|
||||||
@ -203,7 +178,7 @@ func TestDialQueueMutePeriodHonored(t *testing.T) {
|
|||||||
in.EnqChan <- peer.ID(i)
|
in.EnqChan <- peer.ID(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
dq := newDialQueue(context.Background(), "test", in, dialFn, 3)
|
dq := newDialQueue(context.Background(), "test", in, dialFn)
|
||||||
|
|
||||||
// pick up three consumers.
|
// pick up three consumers.
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
|
2
query.go
2
query.go
@ -103,7 +103,7 @@ func newQueryRunner(q *dhtQuery) *dhtQueryRunner {
|
|||||||
peersToQuery: peersToQuery,
|
peersToQuery: peersToQuery,
|
||||||
proc: proc,
|
proc: proc,
|
||||||
}
|
}
|
||||||
r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer, AlphaValue)
|
r.peersDialed = newDialQueue(ctx, q.key, peersToQuery, r.dialPeer)
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user