go-libp2p-kad-dht/dial_queue_test.go

194 lines
4.5 KiB
Go
Raw Normal View History

package dht
import (
"context"
"sync"
2019-01-29 20:49:14 +00:00
"sync/atomic"
"testing"
"time"
2019-01-31 13:08:45 +11:00
"github.com/libp2p/go-libp2p-peer"
"github.com/libp2p/go-libp2p-peerstore/queue"
)
func TestDialQueueGrowsOnSlowDials(t *testing.T) {
2019-01-29 20:49:14 +00:00
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
2019-01-29 20:49:14 +00:00
var cnt int32
dialFn := func(ctx context.Context, p peer.ID) error {
2019-01-29 20:49:14 +00:00
atomic.AddInt32(&cnt, 1)
<-hang
return nil
}
// Enqueue 20 jobs.
for i := 0; i < 20; i++ {
in.EnqChan <- peer.ID(i)
}
// remove the mute period to grow faster.
2019-01-31 13:08:45 +11:00
dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0)
for i := 0; i < 4; i++ {
2019-01-29 16:47:42 +00:00
_ = dq.Consume()
time.Sleep(100 * time.Millisecond)
}
2019-01-29 20:49:14 +00:00
for i := 0; i < 20; i++ {
if atomic.LoadInt32(&cnt) > int32(DialQueueMinParallelism) {
return
}
time.Sleep(100 * time.Millisecond)
}
2019-01-29 20:49:14 +00:00
t.Errorf("expected 19 concurrent dials, got %d", atomic.LoadInt32(&cnt))
}
func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
2019-01-29 20:49:14 +00:00
// reduce interference from the other shrink path.
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
2019-01-29 20:49:14 +00:00
wg := new(sync.WaitGroup)
wg.Add(13)
dialFn := func(ctx context.Context, p peer.ID) error {
wg.Done()
<-hang
return nil
}
2019-01-31 13:08:45 +11:00
dq := newDialQueue(context.Background(), "test", in, dialFn, 10*time.Minute, 0)
// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
// and immediately returnable.
for i := 0; i < 3; i++ {
2019-01-29 16:47:42 +00:00
_ = dq.Consume()
}
2019-01-29 20:49:14 +00:00
// Enqueue 13 jobs, one per worker we'll grow to.
for i := 0; i < 13; i++ {
in.EnqChan <- peer.ID(i)
}
waitForWg(t, wg, 2*time.Second)
// Release a few dialFn, but not all of them because downscaling happens when workers detect there are no
// consumers to consume their values. So the other three will be these witnesses.
2019-01-29 20:49:14 +00:00
for i := 0; i < 3; i++ {
hang <- struct{}{}
}
// allow enough time for signalling and dispatching values to outstanding consumers.
2019-01-29 20:49:14 +00:00
time.Sleep(1 * time.Second)
2019-01-29 20:49:14 +00:00
// unblock the rest.
for i := 0; i < 10; i++ {
hang <- struct{}{}
}
2019-01-29 20:49:14 +00:00
wg = new(sync.WaitGroup)
// we should now only have 6 workers, because all the shrink events will have been honoured.
wg.Add(6)
2019-01-29 20:49:14 +00:00
// enqueue more jobs.
for i := 0; i < 6; i++ {
in.EnqChan <- peer.ID(i)
}
// let's check we have 6 workers hanging.
2019-01-29 20:49:14 +00:00
waitForWg(t, wg, 2*time.Second)
}
// Inactivity = workers are idle because the DHT query is progressing slow and is producing too few peers to dial.
2019-01-29 20:49:14 +00:00
func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
var wg sync.WaitGroup
wg.Add(13)
dialFn := func(ctx context.Context, p peer.ID) error {
wg.Done()
<-hang
return nil
}
// Enqueue 13 jobs.
for i := 0; i < 13; i++ {
in.EnqChan <- peer.ID(i)
}
2019-01-31 13:08:45 +11:00
dq := newDialQueue(context.Background(), "test", in, dialFn, time.Second, 0)
// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
for i := 0; i < 13; i++ {
2019-01-29 16:47:42 +00:00
ch := dq.Consume()
hang <- struct{}{}
<-ch
time.Sleep(100 * time.Millisecond)
}
// wait for MaxIdlePeriod.
time.Sleep(1500 * time.Millisecond)
// we should now only have 6 workers, because all the shrink events will have been honoured.
wg.Add(6)
// enqueue more jobs
for i := 0; i < 10; i++ {
in.EnqChan <- peer.ID(i)
}
// let's check we have 6 workers hanging.
waitForWg(t, &wg, 2*time.Second)
}
func TestDialQueueMutePeriodHonored(t *testing.T) {
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
hang := make(chan struct{})
var wg sync.WaitGroup
wg.Add(6)
dialFn := func(ctx context.Context, p peer.ID) error {
wg.Done()
<-hang
return nil
}
// Enqueue a bunch of jobs.
for i := 0; i < 20; i++ {
in.EnqChan <- peer.ID(i)
}
2019-01-31 13:08:45 +11:00
dq := newDialQueue(context.Background(), "test", in, dialFn, DialQueueMaxIdle, 2*time.Second)
// pick up three consumers.
for i := 0; i < 3; i++ {
2019-01-29 16:47:42 +00:00
_ = dq.Consume()
time.Sleep(100 * time.Millisecond)
}
time.Sleep(500 * time.Millisecond)
// we'll only have 6 workers because the grow signals have been ignored.
waitForWg(t, &wg, 2*time.Second)
}
func waitForWg(t *testing.T, wg *sync.WaitGroup, wait time.Duration) {
t.Helper()
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-time.After(wait):
t.Error("timeout while waiting for WaitGroup")
case <-done:
}
}