2019-01-24 18:04:52 +00:00
|
|
|
package dht
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"math"
|
|
|
|
"time"
|
|
|
|
|
2019-01-28 22:29:53 +00:00
|
|
|
peer "github.com/libp2p/go-libp2p-peer"
|
|
|
|
queue "github.com/libp2p/go-libp2p-peerstore/queue"
|
2019-01-24 18:04:52 +00:00
|
|
|
)
|
|
|
|
|
2019-01-29 16:15:40 +00:00
|
|
|
var (
|
2019-01-29 20:48:38 +00:00
|
|
|
// DialQueueMinParallelism is the minimum number of worker dial goroutines that will be alive at any time.
|
|
|
|
DialQueueMinParallelism = 6
|
|
|
|
// DialQueueMaxParallelism is the maximum number of worker dial goroutines that can be alive at any time.
|
|
|
|
DialQueueMaxParallelism = 20
|
|
|
|
// DialQueueMaxIdle is the period that a worker dial goroutine waits before signalling a worker pool downscaling.
|
|
|
|
DialQueueMaxIdle = 5 * time.Second
|
|
|
|
// DialQueueScalingMutePeriod is the amount of time to ignore further worker pool scaling events, after one is
|
|
|
|
// processed. Its role is to reduce jitter.
|
2019-01-29 16:15:40 +00:00
|
|
|
DialQueueScalingMutePeriod = 1 * time.Second
|
|
|
|
)
|
2019-01-24 18:04:52 +00:00
|
|
|
|
|
|
|
type dialQueue struct {
|
|
|
|
ctx context.Context
|
|
|
|
dialFn func(context.Context, peer.ID) error
|
|
|
|
|
|
|
|
nWorkers int
|
|
|
|
scalingFactor float64
|
|
|
|
|
|
|
|
in *queue.ChanQueue
|
|
|
|
out *queue.ChanQueue
|
|
|
|
|
2019-01-28 22:29:53 +00:00
|
|
|
waitingCh chan waitingCh
|
2019-01-24 18:04:52 +00:00
|
|
|
dieCh chan struct{}
|
|
|
|
growCh chan struct{}
|
|
|
|
shrinkCh chan struct{}
|
|
|
|
}
|
|
|
|
|
2019-01-28 22:29:53 +00:00
|
|
|
type waitingCh struct {
|
|
|
|
ch chan<- peer.ID
|
|
|
|
ts time.Time
|
|
|
|
}
|
|
|
|
|
2019-01-24 18:04:52 +00:00
|
|
|
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
|
2019-01-28 11:59:01 +00:00
|
|
|
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
|
|
|
|
// and dial producers), and takes compensating action by adjusting the worker pool.
|
2019-01-24 18:04:52 +00:00
|
|
|
//
|
|
|
|
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
|
|
|
|
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
|
|
|
|
// and protocol negotiation.
|
|
|
|
//
|
|
|
|
// We start with DialQueueMinParallelism number of workers, and scale up and down based on demand and supply of
|
|
|
|
// dialled peers.
|
|
|
|
//
|
|
|
|
// The following events trigger scaling:
|
2019-01-28 11:59:01 +00:00
|
|
|
// - we scale up when we can't immediately return a successful dial to a new consumer.
|
|
|
|
// - we scale down when we've been idle for a while waiting for new dial attempts.
|
|
|
|
// - we scale down when we complete a dial and realise nobody was waiting for it.
|
2019-01-24 18:04:52 +00:00
|
|
|
//
|
2019-01-28 11:59:01 +00:00
|
|
|
// Dialler throttling (e.g. FD limit exceeded) is a concern, as we can easily spin up more workers to compensate, and
|
|
|
|
// end up adding fuel to the fire. Since we have no deterministic way to detect this for now, we hard-limit concurrency
|
|
|
|
// to DialQueueMaxParallelism.
|
2019-01-24 18:04:52 +00:00
|
|
|
func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialFn func(context.Context, peer.ID) error, nConsumers int) *dialQueue {
|
|
|
|
sq := &dialQueue{
|
|
|
|
ctx: ctx,
|
|
|
|
dialFn: dialFn,
|
|
|
|
nWorkers: DialQueueMinParallelism,
|
|
|
|
scalingFactor: 1.5,
|
|
|
|
|
|
|
|
in: in,
|
|
|
|
out: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(target)),
|
|
|
|
|
|
|
|
growCh: make(chan struct{}, nConsumers),
|
|
|
|
shrinkCh: make(chan struct{}, 1),
|
2019-01-28 22:29:53 +00:00
|
|
|
waitingCh: make(chan waitingCh, nConsumers),
|
2019-01-24 18:04:52 +00:00
|
|
|
dieCh: make(chan struct{}, DialQueueMaxParallelism),
|
|
|
|
}
|
|
|
|
for i := 0; i < DialQueueMinParallelism; i++ {
|
|
|
|
go sq.worker()
|
|
|
|
}
|
|
|
|
go sq.control()
|
|
|
|
return sq
|
|
|
|
}
|
|
|
|
|
|
|
|
func (dq *dialQueue) control() {
|
|
|
|
var (
|
|
|
|
p peer.ID
|
|
|
|
dialled = dq.out.DeqChan
|
2019-01-28 22:29:53 +00:00
|
|
|
resp waitingCh
|
|
|
|
waiting <-chan waitingCh
|
2019-01-24 18:04:52 +00:00
|
|
|
lastScalingEvt = time.Now()
|
|
|
|
)
|
2019-01-29 20:49:04 +00:00
|
|
|
|
|
|
|
defer func() {
|
|
|
|
// close channels.
|
|
|
|
if resp.ch != nil {
|
|
|
|
close(resp.ch)
|
|
|
|
}
|
|
|
|
close(dq.waitingCh)
|
|
|
|
for w := range dq.waitingCh {
|
|
|
|
close(w.ch)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2019-01-24 18:04:52 +00:00
|
|
|
for {
|
|
|
|
// First process any backlog of dial jobs and waiters -- making progress is the priority.
|
|
|
|
// This block is copied below; couldn't find a more concise way of doing this.
|
|
|
|
select {
|
|
|
|
case <-dq.ctx.Done():
|
|
|
|
return
|
|
|
|
case p = <-dialled:
|
|
|
|
dialled, waiting = nil, dq.waitingCh
|
|
|
|
continue // onto the top.
|
|
|
|
case resp = <-waiting:
|
|
|
|
// got a channel that's waiting for a peer.
|
2019-01-28 22:29:53 +00:00
|
|
|
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
|
|
|
|
resp.ch <- p
|
|
|
|
close(resp.ch)
|
2019-01-24 18:04:52 +00:00
|
|
|
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
|
2019-01-29 20:49:04 +00:00
|
|
|
resp.ch = nil
|
|
|
|
continue // onto the top.
|
2019-01-24 18:04:52 +00:00
|
|
|
default:
|
|
|
|
// there's nothing to process, so proceed onto the main select block.
|
|
|
|
}
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-dq.ctx.Done():
|
|
|
|
return
|
|
|
|
case p = <-dialled:
|
|
|
|
dialled, waiting = nil, dq.waitingCh
|
|
|
|
case resp = <-waiting:
|
|
|
|
// got a channel that's waiting for a peer.
|
2019-01-28 22:29:53 +00:00
|
|
|
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
|
|
|
|
resp.ch <- p
|
|
|
|
close(resp.ch)
|
2019-01-24 18:04:52 +00:00
|
|
|
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
|
2019-01-29 20:49:04 +00:00
|
|
|
resp.ch = nil
|
2019-01-24 18:04:52 +00:00
|
|
|
case <-dq.growCh:
|
|
|
|
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
dq.grow()
|
|
|
|
lastScalingEvt = time.Now()
|
|
|
|
case <-dq.shrinkCh:
|
|
|
|
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
dq.shrink()
|
|
|
|
lastScalingEvt = time.Now()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-01-29 16:47:42 +00:00
|
|
|
func (dq *dialQueue) Consume() <-chan peer.ID {
|
2019-01-24 18:04:52 +00:00
|
|
|
ch := make(chan peer.ID, 1)
|
|
|
|
|
|
|
|
select {
|
|
|
|
case p := <-dq.out.DeqChan:
|
2019-01-29 16:47:42 +00:00
|
|
|
// short circuit and return a dialled peer if it's immediately available.
|
2019-01-24 18:04:52 +00:00
|
|
|
ch <- p
|
|
|
|
close(ch)
|
2019-01-29 16:47:42 +00:00
|
|
|
return ch
|
|
|
|
case <-dq.ctx.Done():
|
|
|
|
// return a closed channel with no value if we're done.
|
|
|
|
close(ch)
|
|
|
|
return ch
|
2019-01-24 18:04:52 +00:00
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
// we have no finished dials to return, trigger a scale up.
|
|
|
|
select {
|
|
|
|
case dq.growCh <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
|
|
|
// park the channel until a dialled peer becomes available.
|
|
|
|
select {
|
2019-01-29 20:49:04 +00:00
|
|
|
case <-dq.ctx.Done():
|
|
|
|
// return a closed channel with no value if we're done.
|
|
|
|
close(ch)
|
|
|
|
return ch
|
2019-01-28 22:29:53 +00:00
|
|
|
case dq.waitingCh <- waitingCh{ch, time.Now()}:
|
2019-01-24 18:04:52 +00:00
|
|
|
default:
|
2019-01-29 16:34:39 +00:00
|
|
|
panic("detected more consuming goroutines than declared upfront")
|
2019-01-24 18:04:52 +00:00
|
|
|
}
|
2019-01-29 16:47:42 +00:00
|
|
|
return ch
|
2019-01-24 18:04:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (dq *dialQueue) grow() {
|
2019-01-29 16:15:40 +00:00
|
|
|
// no mutex needed as this is only called from the (single-threaded) control loop.
|
2019-01-28 22:29:53 +00:00
|
|
|
defer func(prev int) {
|
|
|
|
if prev == dq.nWorkers {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
|
|
|
|
}(dq.nWorkers)
|
|
|
|
|
2019-01-24 18:04:52 +00:00
|
|
|
if dq.nWorkers == DialQueueMaxParallelism {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
target := int(math.Floor(float64(dq.nWorkers) * dq.scalingFactor))
|
|
|
|
if target > DialQueueMaxParallelism {
|
|
|
|
target = DialQueueMinParallelism
|
|
|
|
}
|
|
|
|
for ; dq.nWorkers < target; dq.nWorkers++ {
|
|
|
|
go dq.worker()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (dq *dialQueue) shrink() {
|
2019-01-29 16:15:40 +00:00
|
|
|
// no mutex needed as this is only called from the (single-threaded) control loop.
|
2019-01-28 22:29:53 +00:00
|
|
|
defer func(prev int) {
|
|
|
|
if prev == dq.nWorkers {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
|
|
|
|
}(dq.nWorkers)
|
|
|
|
|
2019-01-24 18:04:52 +00:00
|
|
|
if dq.nWorkers == DialQueueMinParallelism {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
target := int(math.Floor(float64(dq.nWorkers) / dq.scalingFactor))
|
|
|
|
if target < DialQueueMinParallelism {
|
|
|
|
target = DialQueueMinParallelism
|
|
|
|
}
|
|
|
|
// send as many die signals as workers we have to prune.
|
|
|
|
for ; dq.nWorkers > target; dq.nWorkers-- {
|
|
|
|
select {
|
|
|
|
case dq.dieCh <- struct{}{}:
|
|
|
|
default:
|
|
|
|
log.Debugf("too many die signals queued up.")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (dq *dialQueue) worker() {
|
|
|
|
// This idle timer tracks if the environment is slow. If we're waiting to long to acquire a peer to dial,
|
|
|
|
// it means that the DHT query is progressing slow and we should shrink the worker pool.
|
2019-01-29 20:49:04 +00:00
|
|
|
idleTimer := time.NewTimer(24 * time.Hour) // placeholder init value which will be overridden immediately.
|
2019-01-24 18:04:52 +00:00
|
|
|
for {
|
|
|
|
// trap exit signals first.
|
|
|
|
select {
|
|
|
|
case <-dq.ctx.Done():
|
|
|
|
return
|
|
|
|
case <-dq.dieCh:
|
|
|
|
return
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
|
2019-01-29 20:49:04 +00:00
|
|
|
idleTimer.Stop()
|
|
|
|
select {
|
|
|
|
case <-idleTimer.C:
|
|
|
|
default:
|
2019-01-24 18:04:52 +00:00
|
|
|
}
|
|
|
|
idleTimer.Reset(DialQueueMaxIdle)
|
|
|
|
|
|
|
|
select {
|
|
|
|
case <-dq.dieCh:
|
|
|
|
return
|
|
|
|
case <-dq.ctx.Done():
|
|
|
|
return
|
|
|
|
case <-idleTimer.C:
|
|
|
|
// no new dial requests during our idle period; time to scale down.
|
|
|
|
case p := <-dq.in.DeqChan:
|
2019-01-28 22:29:53 +00:00
|
|
|
t := time.Now()
|
2019-01-24 18:04:52 +00:00
|
|
|
if err := dq.dialFn(dq.ctx, p); err != nil {
|
|
|
|
log.Debugf("discarding dialled peer because of error: %v", err)
|
|
|
|
continue
|
|
|
|
}
|
2019-01-28 22:29:53 +00:00
|
|
|
log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Now().Sub(t)/time.Millisecond)
|
2019-01-24 18:04:52 +00:00
|
|
|
waiting := len(dq.waitingCh)
|
|
|
|
dq.out.EnqChan <- p
|
|
|
|
if waiting > 0 {
|
|
|
|
// we have somebody to deliver this value to, so no need to shrink.
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// scaling down; control only arrives here if the idle timer fires, or if there are no goroutines
|
|
|
|
// waiting for the value we just produced.
|
|
|
|
select {
|
|
|
|
case dq.shrinkCh <- struct{}{}:
|
|
|
|
default:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|