mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-08-01 04:22:05 +00:00
cleanup unused code
This commit is contained in:
committed by
Steven Allen
parent
b4f7fda731
commit
8ef9d774c9
360
dial_queue.go
360
dial_queue.go
@@ -1,360 +0,0 @@
|
|||||||
package dht
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"math"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
|
||||||
queue "github.com/libp2p/go-libp2p-peerstore/queue"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
// DefaultDialQueueMinParallelism is the default value for the minimum number of worker dial goroutines that will
|
|
||||||
// be alive at any time.
|
|
||||||
DefaultDialQueueMinParallelism = 6
|
|
||||||
// DefaultDialQueueMaxParallelism is the default value for the maximum number of worker dial goroutines that can
|
|
||||||
// be alive at any time.
|
|
||||||
DefaultDialQueueMaxParallelism = 20
|
|
||||||
// DefaultDialQueueMaxIdle is the default value for the period that a worker dial goroutine waits before signalling
|
|
||||||
// a worker pool downscaling.
|
|
||||||
DefaultDialQueueMaxIdle = 5 * time.Second
|
|
||||||
// DefaultDialQueueScalingMutePeriod is the default value for the amount of time to ignore further worker pool
|
|
||||||
// scaling events, after one is processed. Its role is to reduce jitter.
|
|
||||||
DefaultDialQueueScalingMutePeriod = 1 * time.Second
|
|
||||||
// DefaultDialQueueScalingFactor is the default factor by which the current number of workers will be multiplied
|
|
||||||
// or divided when upscaling and downscaling events occur, respectively.
|
|
||||||
DefaultDialQueueScalingFactor = 1.5
|
|
||||||
)
|
|
||||||
|
|
||||||
type dialQueue struct {
|
|
||||||
*dqParams
|
|
||||||
|
|
||||||
nWorkers uint
|
|
||||||
out *queue.ChanQueue
|
|
||||||
startOnce sync.Once
|
|
||||||
|
|
||||||
waitingCh chan waitingCh
|
|
||||||
dieCh chan struct{}
|
|
||||||
growCh chan struct{}
|
|
||||||
shrinkCh chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
type dqParams struct {
|
|
||||||
ctx context.Context
|
|
||||||
target string
|
|
||||||
dialFn func(context.Context, peer.ID) error
|
|
||||||
in *queue.ChanQueue
|
|
||||||
config dqConfig
|
|
||||||
}
|
|
||||||
|
|
||||||
type dqConfig struct {
|
|
||||||
// minParallelism is the minimum number of worker dial goroutines that will be alive at any time.
|
|
||||||
minParallelism uint
|
|
||||||
// maxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
|
|
||||||
maxParallelism uint
|
|
||||||
// scalingFactor is the factor by which the current number of workers will be multiplied or divided when upscaling
|
|
||||||
// and downscaling events occur, respectively.
|
|
||||||
scalingFactor float64
|
|
||||||
// mutePeriod is the amount of time to ignore further worker pool scaling events, after one is processed.
|
|
||||||
// Its role is to reduce jitter.
|
|
||||||
mutePeriod time.Duration
|
|
||||||
// maxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
|
|
||||||
maxIdle time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
// dqDefaultConfig returns the default configuration for dial queues. See const documentation to learn the default values.
|
|
||||||
func dqDefaultConfig() dqConfig {
|
|
||||||
return dqConfig{
|
|
||||||
minParallelism: DefaultDialQueueMinParallelism,
|
|
||||||
maxParallelism: DefaultDialQueueMaxParallelism,
|
|
||||||
scalingFactor: DefaultDialQueueScalingFactor,
|
|
||||||
maxIdle: DefaultDialQueueMaxIdle,
|
|
||||||
mutePeriod: DefaultDialQueueScalingMutePeriod,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type waitingCh struct {
|
|
||||||
ch chan<- peer.ID
|
|
||||||
ts time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
|
|
||||||
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
|
|
||||||
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
|
|
||||||
// activate the dial queue, call Start().
|
|
||||||
//
|
|
||||||
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
|
|
||||||
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
|
|
||||||
// and protocol negotiation.
|
|
||||||
//
|
|
||||||
// We start with config.minParallelism number of workers, and scale up and down based on demand and supply of
|
|
||||||
// dialled peers.
|
|
||||||
//
|
|
||||||
// The following events trigger scaling:
|
|
||||||
// - we scale up when we can't immediately return a successful dial to a new consumer.
|
|
||||||
// - we scale down when we've been idle for a while waiting for new dial attempts.
|
|
||||||
// - we scale down when we complete a dial and realise nobody was waiting for it.
|
|
||||||
//
|
|
||||||
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
|
|
||||||
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
|
|
||||||
// to config.maxParallelism.
|
|
||||||
func newDialQueue(params *dqParams) (*dialQueue, error) {
|
|
||||||
dq := &dialQueue{
|
|
||||||
dqParams: params,
|
|
||||||
out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
|
|
||||||
growCh: make(chan struct{}, 1),
|
|
||||||
shrinkCh: make(chan struct{}, 1),
|
|
||||||
waitingCh: make(chan waitingCh),
|
|
||||||
dieCh: make(chan struct{}, params.config.maxParallelism),
|
|
||||||
}
|
|
||||||
|
|
||||||
return dq, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
|
|
||||||
func (dq *dialQueue) Start() {
|
|
||||||
dq.startOnce.Do(func() {
|
|
||||||
go dq.control()
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dq *dialQueue) control() {
|
|
||||||
var (
|
|
||||||
dialled <-chan peer.ID
|
|
||||||
waiting []waitingCh
|
|
||||||
lastScalingEvt = time.Now()
|
|
||||||
)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
for _, w := range waiting {
|
|
||||||
close(w.ch)
|
|
||||||
}
|
|
||||||
waiting = nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
// start workers
|
|
||||||
|
|
||||||
tgt := int(dq.dqParams.config.minParallelism)
|
|
||||||
for i := 0; i < tgt; i++ {
|
|
||||||
go dq.worker()
|
|
||||||
}
|
|
||||||
dq.nWorkers = uint(tgt)
|
|
||||||
|
|
||||||
// control workers
|
|
||||||
|
|
||||||
for {
|
|
||||||
// First process any backlog of dial jobs and waiters -- making progress is the priority.
|
|
||||||
// This block is copied below; couldn't find a more concise way of doing this.
|
|
||||||
select {
|
|
||||||
case <-dq.ctx.Done():
|
|
||||||
return
|
|
||||||
case w := <-dq.waitingCh:
|
|
||||||
waiting = append(waiting, w)
|
|
||||||
dialled = dq.out.DeqChan
|
|
||||||
continue // onto the top.
|
|
||||||
case p, ok := <-dialled:
|
|
||||||
if !ok {
|
|
||||||
return // we're done if the ChanQueue is closed, which happens when the context is closed.
|
|
||||||
}
|
|
||||||
w := waiting[0]
|
|
||||||
logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
|
|
||||||
w.ch <- p
|
|
||||||
close(w.ch)
|
|
||||||
waiting = waiting[1:]
|
|
||||||
if len(waiting) == 0 {
|
|
||||||
// no more waiters, so stop consuming dialled jobs.
|
|
||||||
dialled = nil
|
|
||||||
}
|
|
||||||
continue // onto the top.
|
|
||||||
default:
|
|
||||||
// there's nothing to process, so proceed onto the main select block.
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-dq.ctx.Done():
|
|
||||||
return
|
|
||||||
case w := <-dq.waitingCh:
|
|
||||||
waiting = append(waiting, w)
|
|
||||||
dialled = dq.out.DeqChan
|
|
||||||
case p, ok := <-dialled:
|
|
||||||
if !ok {
|
|
||||||
return // we're done if the ChanQueue is closed, which happens when the context is closed.
|
|
||||||
}
|
|
||||||
w := waiting[0]
|
|
||||||
logger.Debugf("delivering dialled peer to DHT; took %dms.", time.Since(w.ts)/time.Millisecond)
|
|
||||||
w.ch <- p
|
|
||||||
close(w.ch)
|
|
||||||
waiting = waiting[1:]
|
|
||||||
if len(waiting) == 0 {
|
|
||||||
// no more waiters, so stop consuming dialled jobs.
|
|
||||||
dialled = nil
|
|
||||||
}
|
|
||||||
case <-dq.growCh:
|
|
||||||
if time.Since(lastScalingEvt) < dq.config.mutePeriod {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dq.grow()
|
|
||||||
lastScalingEvt = time.Now()
|
|
||||||
case <-dq.shrinkCh:
|
|
||||||
if time.Since(lastScalingEvt) < dq.config.mutePeriod {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
dq.shrink()
|
|
||||||
lastScalingEvt = time.Now()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dq *dialQueue) Consume() <-chan peer.ID {
|
|
||||||
ch := make(chan peer.ID, 1)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case p, ok := <-dq.out.DeqChan:
|
|
||||||
// short circuit and return a dialled peer if it's immediately available, or abort if DeqChan is closed.
|
|
||||||
if ok {
|
|
||||||
ch <- p
|
|
||||||
}
|
|
||||||
close(ch)
|
|
||||||
return ch
|
|
||||||
case <-dq.ctx.Done():
|
|
||||||
// return a closed channel with no value if we're done.
|
|
||||||
close(ch)
|
|
||||||
return ch
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// we have no finished dials to return, trigger a scale up.
|
|
||||||
select {
|
|
||||||
case dq.growCh <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// park the channel until a dialled peer becomes available.
|
|
||||||
select {
|
|
||||||
case dq.waitingCh <- waitingCh{ch, time.Now()}:
|
|
||||||
// all good
|
|
||||||
case <-dq.ctx.Done():
|
|
||||||
// return a closed channel with no value if we're done.
|
|
||||||
close(ch)
|
|
||||||
}
|
|
||||||
return ch
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dq *dialQueue) grow() {
|
|
||||||
// no mutex needed as this is only called from the (single-threaded) control loop.
|
|
||||||
defer func(prev uint) {
|
|
||||||
if prev == dq.nWorkers {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
|
|
||||||
}(dq.nWorkers)
|
|
||||||
|
|
||||||
if dq.nWorkers == dq.config.maxParallelism {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// choosing not to worry about uint wrapping beyond max value.
|
|
||||||
target := uint(math.Floor(float64(dq.nWorkers) * dq.config.scalingFactor))
|
|
||||||
if target > dq.config.maxParallelism {
|
|
||||||
target = dq.config.maxParallelism
|
|
||||||
}
|
|
||||||
for ; dq.nWorkers < target; dq.nWorkers++ {
|
|
||||||
go dq.worker()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dq *dialQueue) shrink() {
|
|
||||||
// no mutex needed as this is only called from the (single-threaded) control loop.
|
|
||||||
defer func(prev uint) {
|
|
||||||
if prev == dq.nWorkers {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
logger.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
|
|
||||||
}(dq.nWorkers)
|
|
||||||
|
|
||||||
if dq.nWorkers == dq.config.minParallelism {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
target := uint(math.Floor(float64(dq.nWorkers) / dq.config.scalingFactor))
|
|
||||||
if target < dq.config.minParallelism {
|
|
||||||
target = dq.config.minParallelism
|
|
||||||
}
|
|
||||||
// send as many die signals as workers we have to prune.
|
|
||||||
for ; dq.nWorkers > target; dq.nWorkers-- {
|
|
||||||
select {
|
|
||||||
case dq.dieCh <- struct{}{}:
|
|
||||||
default:
|
|
||||||
logger.Debugf("too many die signals queued up.")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (dq *dialQueue) worker() {
|
|
||||||
// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
|
|
||||||
// it means that the DHT query is progressing slow and we should shrink the worker pool.
|
|
||||||
idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
|
|
||||||
defer idleTimer.Stop()
|
|
||||||
for {
|
|
||||||
// trap exit signals first.
|
|
||||||
select {
|
|
||||||
case <-dq.ctx.Done():
|
|
||||||
return
|
|
||||||
case <-dq.dieCh:
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
idleTimer.Stop()
|
|
||||||
select {
|
|
||||||
case <-idleTimer.C:
|
|
||||||
default:
|
|
||||||
// NOTE: There is a slight race here. We could be in the
|
|
||||||
// middle of firing the timer and not read anything from the channel.
|
|
||||||
//
|
|
||||||
// However, that's not really a huge issue. We'll think
|
|
||||||
// we're idle but that's fine.
|
|
||||||
}
|
|
||||||
idleTimer.Reset(dq.config.maxIdle)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-dq.dieCh:
|
|
||||||
return
|
|
||||||
case <-dq.ctx.Done():
|
|
||||||
return
|
|
||||||
case <-idleTimer.C:
|
|
||||||
// no new dial requests during our idle period; time to scale down.
|
|
||||||
case p, ok := <-dq.in.DeqChan:
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t := time.Now()
|
|
||||||
if err := dq.dialFn(dq.ctx, p); err != nil {
|
|
||||||
logger.Debugf("discarding dialled peer because of error: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
logger.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Since(t)/time.Millisecond)
|
|
||||||
waiting := len(dq.waitingCh)
|
|
||||||
|
|
||||||
// by the time we're done dialling, it's possible that the context is closed, in which case there will
|
|
||||||
// be nobody listening on dq.out.EnqChan and we could block forever.
|
|
||||||
select {
|
|
||||||
case dq.out.EnqChan <- p:
|
|
||||||
case <-dq.ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if waiting > 0 {
|
|
||||||
// we have somebody to deliver this value to, so no need to shrink.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
|
|
||||||
// waiting for the value we just produced.
|
|
||||||
select {
|
|
||||||
case dq.shrinkCh <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@@ -1,247 +0,0 @@
|
|||||||
package dht
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p-core/peer"
|
|
||||||
queue "github.com/libp2p/go-libp2p-peerstore/queue"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestDialQueueGrowsOnSlowDials(t *testing.T) {
|
|
||||||
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
|
|
||||||
hang := make(chan struct{})
|
|
||||||
|
|
||||||
var cnt int32
|
|
||||||
dialFn := func(ctx context.Context, p peer.ID) error {
|
|
||||||
atomic.AddInt32(&cnt, 1)
|
|
||||||
<-hang
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue 20 jobs.
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
in.EnqChan <- peer.ID(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove the mute period to grow faster.
|
|
||||||
config := dqDefaultConfig()
|
|
||||||
config.maxIdle = 10 * time.Minute
|
|
||||||
config.mutePeriod = 0
|
|
||||||
dq, err := newDialQueue(&dqParams{
|
|
||||||
ctx: context.Background(),
|
|
||||||
target: "test",
|
|
||||||
in: in,
|
|
||||||
dialFn: dialFn,
|
|
||||||
config: config,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Error("unexpected error when constructing the dial queue", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dq.Start()
|
|
||||||
|
|
||||||
for i := 0; i < 4; i++ {
|
|
||||||
_ = dq.Consume()
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
if atomic.LoadInt32(&cnt) > int32(DefaultDialQueueMinParallelism) {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Errorf("expected 19 concurrent dials, got %d", atomic.LoadInt32(&cnt))
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
|
|
||||||
// reduce interference from the other shrink path.
|
|
||||||
|
|
||||||
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
|
|
||||||
hang := make(chan struct{})
|
|
||||||
|
|
||||||
wg := new(sync.WaitGroup)
|
|
||||||
wg.Add(13)
|
|
||||||
dialFn := func(ctx context.Context, p peer.ID) error {
|
|
||||||
wg.Done()
|
|
||||||
<-hang
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
config := dqDefaultConfig()
|
|
||||||
config.maxIdle = 10 * time.Minute
|
|
||||||
config.mutePeriod = 0
|
|
||||||
dq, err := newDialQueue(&dqParams{
|
|
||||||
ctx: context.Background(),
|
|
||||||
target: "test",
|
|
||||||
in: in,
|
|
||||||
dialFn: dialFn,
|
|
||||||
config: config,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Error("unexpected error when constructing the dial queue", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dq.Start()
|
|
||||||
|
|
||||||
// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
|
|
||||||
// and immediately returnable.
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
_ = dq.Consume()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue 13 jobs, one per worker we'll grow to.
|
|
||||||
for i := 0; i < 13; i++ {
|
|
||||||
in.EnqChan <- peer.ID(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
waitForWg(t, wg, 2*time.Second)
|
|
||||||
|
|
||||||
// Release a few dialFn, but not all of them because downscaling happens when workers detect there are no
|
|
||||||
// consumers to consume their values. So the other three will be these witnesses.
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
hang <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// allow enough time for signalling and dispatching values to outstanding consumers.
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
// unblock the rest.
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
hang <- struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
wg = new(sync.WaitGroup)
|
|
||||||
// we should now only have 6 workers, because all the shrink events will have been honoured.
|
|
||||||
wg.Add(6)
|
|
||||||
|
|
||||||
// enqueue more jobs.
|
|
||||||
for i := 0; i < 6; i++ {
|
|
||||||
in.EnqChan <- peer.ID(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// let's check we have 6 workers hanging.
|
|
||||||
waitForWg(t, wg, 2*time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Inactivity = workers are idle because the DHT query is progressing slow and is producing too few peers to dial.
|
|
||||||
func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
|
|
||||||
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
|
|
||||||
hang := make(chan struct{})
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(13)
|
|
||||||
dialFn := func(ctx context.Context, p peer.ID) error {
|
|
||||||
wg.Done()
|
|
||||||
<-hang
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue 13 jobs.
|
|
||||||
for i := 0; i < 13; i++ {
|
|
||||||
in.EnqChan <- peer.ID(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
config := dqDefaultConfig()
|
|
||||||
config.maxIdle = 1 * time.Second
|
|
||||||
config.mutePeriod = 0
|
|
||||||
dq, err := newDialQueue(&dqParams{
|
|
||||||
ctx: context.Background(),
|
|
||||||
target: "test",
|
|
||||||
in: in,
|
|
||||||
dialFn: dialFn,
|
|
||||||
config: config,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Error("unexpected error when constructing the dial queue", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dq.Start()
|
|
||||||
|
|
||||||
// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
|
|
||||||
for i := 0; i < 13; i++ {
|
|
||||||
ch := dq.Consume()
|
|
||||||
hang <- struct{}{}
|
|
||||||
<-ch
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for MaxIdlePeriod.
|
|
||||||
time.Sleep(1500 * time.Millisecond)
|
|
||||||
|
|
||||||
// we should now only have 6 workers, because all the shrink events will have been honoured.
|
|
||||||
wg.Add(6)
|
|
||||||
|
|
||||||
// enqueue more jobs
|
|
||||||
for i := 0; i < 10; i++ {
|
|
||||||
in.EnqChan <- peer.ID(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// let's check we have 6 workers hanging.
|
|
||||||
waitForWg(t, &wg, 2*time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestDialQueueMutePeriodHonored(t *testing.T) {
|
|
||||||
in := queue.NewChanQueue(context.Background(), queue.NewXORDistancePQ("test"))
|
|
||||||
hang := make(chan struct{})
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(6)
|
|
||||||
dialFn := func(ctx context.Context, p peer.ID) error {
|
|
||||||
wg.Done()
|
|
||||||
<-hang
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Enqueue a bunch of jobs.
|
|
||||||
for i := 0; i < 20; i++ {
|
|
||||||
in.EnqChan <- peer.ID(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
config := dqDefaultConfig()
|
|
||||||
config.mutePeriod = 2 * time.Second
|
|
||||||
dq, err := newDialQueue(&dqParams{
|
|
||||||
ctx: context.Background(),
|
|
||||||
target: "test",
|
|
||||||
in: in,
|
|
||||||
dialFn: dialFn,
|
|
||||||
config: config,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Error("unexpected error when constructing the dial queue", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
dq.Start()
|
|
||||||
|
|
||||||
// pick up three consumers.
|
|
||||||
for i := 0; i < 3; i++ {
|
|
||||||
_ = dq.Consume()
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
}
|
|
||||||
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
|
|
||||||
// we'll only have 6 workers because the grow signals have been ignored.
|
|
||||||
waitForWg(t, &wg, 2*time.Second)
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitForWg(t *testing.T, wg *sync.WaitGroup, wait time.Duration) {
|
|
||||||
t.Helper()
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(done)
|
|
||||||
wg.Wait()
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-time.After(wait):
|
|
||||||
t.Error("timeout while waiting for WaitGroup")
|
|
||||||
case <-done:
|
|
||||||
}
|
|
||||||
}
|
|
@@ -68,91 +68,3 @@ func (ph *peerMetricHeap) Pop() interface{} {
|
|||||||
ph.data = old[0 : n-1]
|
ph.data = old[0 : n-1]
|
||||||
return item
|
return item
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
// KPeerSet implements heap.Interface and PeerQueue
|
|
||||||
type KPeerSet struct {
|
|
||||||
kvalue int
|
|
||||||
|
|
||||||
// from is the Key this PQ measures against
|
|
||||||
from ks.Key
|
|
||||||
|
|
||||||
// heap is a heap of peerDistance items
|
|
||||||
heap peerMetricHeap
|
|
||||||
|
|
||||||
lock sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pq *KPeerSet) Len() int {
|
|
||||||
pq.lock.RLock()
|
|
||||||
defer pq.lock.RUnlock()
|
|
||||||
|
|
||||||
return len(pq.heap)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pq *KPeerSet) Check(p peer.ID) bool {
|
|
||||||
pq.lock.RLock()
|
|
||||||
defer pq.lock.RUnlock()
|
|
||||||
|
|
||||||
if pq.heap.Len() < pq.kvalue {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
|
|
||||||
return distance.Cmp(pq.heap[0].metric) != -1
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pq *KPeerSet) Add(p peer.ID) (bool, peer.ID) {
|
|
||||||
pq.lock.Lock()
|
|
||||||
defer pq.lock.Unlock()
|
|
||||||
|
|
||||||
distance := ks.XORKeySpace.Key([]byte(p)).Distance(pq.from)
|
|
||||||
|
|
||||||
var replacedPeer peer.ID
|
|
||||||
if pq.heap.Len() >= pq.kvalue {
|
|
||||||
// If we're not closer than the worst peer, drop this.
|
|
||||||
if distance.Cmp(pq.heap[0].metric) != -1 {
|
|
||||||
return false, replacedPeer
|
|
||||||
}
|
|
||||||
// Replacing something, remove it.
|
|
||||||
replacedPeer = heap.Pop(&pq.heap).(*peerMetric).peer
|
|
||||||
}
|
|
||||||
|
|
||||||
heap.Push(&pq.heap, &peerMetric{
|
|
||||||
peer: p,
|
|
||||||
metric: distance,
|
|
||||||
})
|
|
||||||
return true, replacedPeer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pq *KPeerSet) Remove(id peer.ID) {
|
|
||||||
pq.lock.Lock()
|
|
||||||
defer pq.lock.Unlock()
|
|
||||||
|
|
||||||
for i, pm := range pq.heap {
|
|
||||||
if pm.peer == id {
|
|
||||||
heap.Remove(&pq.heap, i)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pq *KPeerSet) Peers() []peer.ID {
|
|
||||||
pq.lock.RLock()
|
|
||||||
defer pq.lock.RUnlock()
|
|
||||||
|
|
||||||
ret := make([]peer.ID, len(pq.heap))
|
|
||||||
for _, pm := range pq.heap {
|
|
||||||
ret = append(ret, pm.peer)
|
|
||||||
}
|
|
||||||
return ret
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(kvalue int, from string) *KPeerSet {
|
|
||||||
return &KPeerSet{
|
|
||||||
from: ks.XORKeySpace.Key([]byte(from)),
|
|
||||||
kvalue: kvalue,
|
|
||||||
heap: make([]*peerMetric, 0, kvalue),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
@@ -26,45 +26,6 @@ func (p peerLatencyMetricList) Less(i, j int) bool {
|
|||||||
func (p peerLatencyMetricList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
func (p peerLatencyMetricList) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
|
||||||
func (p peerLatencyMetricList) GetPeerID(i int) peer.ID { return p[i].peer }
|
func (p peerLatencyMetricList) GetPeerID(i int) peer.ID { return p[i].peer }
|
||||||
|
|
||||||
func less(pm1, pm2 *peerLatencyMetric) bool {
|
|
||||||
p1Connectedness, p2Connectedness := pm1.connectedness, pm2.connectedness
|
|
||||||
p1Latency, p2Latency := pm1.latency, pm2.latency
|
|
||||||
|
|
||||||
// Compare latency assuming that connected is lower latency than unconnected
|
|
||||||
if p1Connectedness == network.Connected {
|
|
||||||
if p2Connectedness == network.Connected {
|
|
||||||
return p1Latency < p2Latency
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if p2Connectedness == network.Connected {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compare latency assuming recent connection is lower latency than older connection.
|
|
||||||
// TODO: This assumption largely stems from our latency library showing peers we know nothing about as
|
|
||||||
// having zero latency
|
|
||||||
if p1Connectedness == network.CanConnect {
|
|
||||||
if p2Connectedness == network.CanConnect {
|
|
||||||
return p1Latency > p2Latency
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if p2Connectedness == network.CanConnect {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if either peer has proven to be unconnectable, if so rank them low
|
|
||||||
if p1Connectedness == network.CannotConnect && p2Connectedness != network.CannotConnect {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if p2Connectedness == network.CannotConnect && p1Connectedness != network.CannotConnect {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return pm1.metric.Cmp(pm2.metric) == -1
|
|
||||||
}
|
|
||||||
|
|
||||||
func calculationLess(pm1, pm2 peerLatencyMetric) bool {
|
func calculationLess(pm1, pm2 peerLatencyMetric) bool {
|
||||||
return calc(pm1).Cmp(calc(pm2)) == -1
|
return calc(pm1).Cmp(calc(pm2)) == -1
|
||||||
}
|
}
|
||||||
@@ -110,54 +71,3 @@ func PeersSortedByLatency(peers []IPeerMetric, net network.Network, metrics peer
|
|||||||
sort.Sort(lst)
|
sort.Sort(lst)
|
||||||
return lst
|
return lst
|
||||||
}
|
}
|
||||||
|
|
||||||
func SortByLatency(net network.Network, metrics peerstore.Metrics) func(peers []*peerMetric) []peer.ID {
|
|
||||||
return func(peers []*peerMetric) []peer.ID {
|
|
||||||
metricLst := NewPeerMetricList(peers, func(p1, p2 *peerMetric) bool {
|
|
||||||
p1Connectedness := net.Connectedness(p1.peer)
|
|
||||||
p2Connectedness := net.Connectedness(p2.peer)
|
|
||||||
|
|
||||||
// Compare latency assuming that connected is lower latency than unconnected
|
|
||||||
if p1Connectedness == network.Connected {
|
|
||||||
if p2Connectedness == network.Connected {
|
|
||||||
return metrics.LatencyEWMA(p1.peer) > metrics.LatencyEWMA(p2.peer)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if p2Connectedness == network.Connected {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Compare latency assuming recent connection is lower latency than older connection.
|
|
||||||
// TODO: This assumption largely stems from our latency library showing peers we know nothing about as
|
|
||||||
// having zero latency
|
|
||||||
if p1Connectedness == network.CanConnect {
|
|
||||||
if p2Connectedness == network.CanConnect {
|
|
||||||
return metrics.LatencyEWMA(p1.peer) > metrics.LatencyEWMA(p2.peer)
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
if p2Connectedness == network.CanConnect {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if either peer has proven to be unconnectable, if so rank them low
|
|
||||||
if p1Connectedness == network.CannotConnect && p2Connectedness != network.CannotConnect {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if p2Connectedness == network.CannotConnect && p1Connectedness != network.CannotConnect {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return p1.metric.Cmp(p2.metric) == -1
|
|
||||||
})
|
|
||||||
|
|
||||||
sort.Stable(metricLst)
|
|
||||||
peerLst := make([]peer.ID, metricLst.Len())
|
|
||||||
for i := range peerLst {
|
|
||||||
peerLst[i] = metricLst.GetPeerID(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
return peerLst
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@@ -14,34 +14,6 @@ type SortablePeers interface {
|
|||||||
GetPeerID(i int) peer.ID
|
GetPeerID(i int) peer.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
type comparer func(id1, id2 *peerMetric) bool
|
|
||||||
|
|
||||||
type peerMetricList struct {
|
|
||||||
data []*peerMetric
|
|
||||||
cmp comparer
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm peerMetricList) Len() int { return len(pm.data) }
|
|
||||||
|
|
||||||
func (pm peerMetricList) Less(i, j int) bool {
|
|
||||||
return pm.cmp(pm.data[i], pm.data[j])
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm peerMetricList) Swap(i, j int) {
|
|
||||||
pm.data[i], pm.data[j] = pm.data[j], pm.data[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pm peerMetricList) GetPeerID(i int) peer.ID {
|
|
||||||
return pm.data[i].peer
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewPeerMetricList(peers []*peerMetric, cmp func(p1, p2 *peerMetric) bool) peerMetricList {
|
|
||||||
return peerMetricList{
|
|
||||||
data: peers,
|
|
||||||
cmp: cmp,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSortedPeerset(kvalue int, from string, sortPeers func([]IPeerMetric) SortablePeers) *SortedPeerset {
|
func NewSortedPeerset(kvalue int, from string, sortPeers func([]IPeerMetric) SortablePeers) *SortedPeerset {
|
||||||
fromKey := ks.XORKeySpace.Key([]byte(from))
|
fromKey := ks.XORKeySpace.Key([]byte(from))
|
||||||
|
|
||||||
|
24
query.go
24
query.go
@@ -15,10 +15,10 @@ import (
|
|||||||
// ErrNoPeersQueried is returned when we failed to connect to any peers.
|
// ErrNoPeersQueried is returned when we failed to connect to any peers.
|
||||||
var ErrNoPeersQueried = errors.New("failed to query any peers")
|
var ErrNoPeersQueried = errors.New("failed to query any peers")
|
||||||
|
|
||||||
type qfn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
|
type queryFn func(context.Context, peer.ID) ([]*peer.AddrInfo, error)
|
||||||
type sfn func(*kpeerset.SortedPeerset) bool
|
type stopFn func(*kpeerset.SortedPeerset) bool
|
||||||
|
|
||||||
type qu struct {
|
type query struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
@@ -26,11 +26,11 @@ type qu struct {
|
|||||||
|
|
||||||
localPeers *kpeerset.SortedPeerset
|
localPeers *kpeerset.SortedPeerset
|
||||||
globallyQueriedPeers *peer.Set
|
globallyQueriedPeers *peer.Set
|
||||||
queryFn qfn
|
queryFn queryFn
|
||||||
stopFn sfn
|
stopFn stopFn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn qfn, stopFn sfn) ([]*qu, error) {
|
func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string, queryFn queryFn, stopFn stopFn) ([]*query, error) {
|
||||||
queryCtx, cancelQuery := context.WithCancel(ctx)
|
queryCtx, cancelQuery := context.WithCancel(ctx)
|
||||||
|
|
||||||
numQueriesComplete := 0
|
numQueriesComplete := 0
|
||||||
@@ -49,11 +49,11 @@ func (dht *IpfsDHT) runDisjointQueries(ctx context.Context, d int, target string
|
|||||||
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
|
seedPeers[i], seedPeers[j] = seedPeers[j], seedPeers[i]
|
||||||
})
|
})
|
||||||
|
|
||||||
queries := make([]*qu, d)
|
queries := make([]*query, d)
|
||||||
|
|
||||||
peersQueried := peer.NewSet()
|
peersQueried := peer.NewSet()
|
||||||
for i := 0; i < d; i++ {
|
for i := 0; i < d; i++ {
|
||||||
query := &qu{
|
query := &query{
|
||||||
ctx: queryCtx,
|
ctx: queryCtx,
|
||||||
cancel: cancelQuery,
|
cancel: cancelQuery,
|
||||||
dht: dht,
|
dht: dht,
|
||||||
@@ -98,7 +98,7 @@ func (dht *IpfsDHT) sortPeers(peers []kpeerset.IPeerMetric) kpeerset.SortablePee
|
|||||||
return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore)
|
return kpeerset.PeersSortedByLatency(peers, dht.host.Network(), dht.peerstore)
|
||||||
}
|
}
|
||||||
|
|
||||||
func strictParallelismQuery(q *qu) {
|
func strictParallelismQuery(q *query) {
|
||||||
/*
|
/*
|
||||||
start with K closest peers (some queried already some not)
|
start with K closest peers (some queried already some not)
|
||||||
take best alpha (sorted by some metric)
|
take best alpha (sorted by some metric)
|
||||||
@@ -150,7 +150,7 @@ func strictParallelismQuery(q *qu) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func simpleQuery(q *qu) {
|
func simpleQuery(q *query) {
|
||||||
/*
|
/*
|
||||||
start with K closest peers (some queried already some not)
|
start with K closest peers (some queried already some not)
|
||||||
take best alpha (sorted by some metric)
|
take best alpha (sorted by some metric)
|
||||||
@@ -210,7 +210,7 @@ func simpleQuery(q *qu) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func boundedDialQuery(q *qu) {
|
func boundedDialQuery(q *query) {
|
||||||
/*
|
/*
|
||||||
start with K closest peers (some queried already some not)
|
start with K closest peers (some queried already some not)
|
||||||
take best alpha (sorted by some metric)
|
take best alpha (sorted by some metric)
|
||||||
@@ -268,7 +268,7 @@ type queryResult struct {
|
|||||||
foundCloserPeer bool
|
foundCloserPeer bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *qu) queryPeer(ctx context.Context, p peer.ID) *queryResult {
|
func (q *query) queryPeer(ctx context.Context, p peer.ID) *queryResult {
|
||||||
dialCtx, queryCtx := ctx, ctx
|
dialCtx, queryCtx := ctx, ctx
|
||||||
|
|
||||||
if err := q.dht.dialPeer(dialCtx, p); err != nil {
|
if err := q.dht.dialPeer(dialCtx, p); err != nil {
|
||||||
|
@@ -320,9 +320,9 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan []*qu) {
|
func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan struct{}) (<-chan RecvdVal, <-chan []*query) {
|
||||||
valCh := make(chan RecvdVal, 1)
|
valCh := make(chan RecvdVal, 1)
|
||||||
queriesCh := make(chan []*qu, 1)
|
queriesCh := make(chan []*query, 1)
|
||||||
|
|
||||||
if rec, err := dht.getLocal(key); rec != nil && err == nil {
|
if rec, err := dht.getLocal(key); rec != nil && err == nil {
|
||||||
select {
|
select {
|
||||||
|
Reference in New Issue
Block a user