mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
enhance logging and import prefixes.
This commit is contained in:
parent
f044043c3d
commit
95a0975dd7
@ -7,8 +7,8 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-peer"
|
||||
"github.com/libp2p/go-libp2p-peerstore/queue"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
queue "github.com/libp2p/go-libp2p-peerstore/queue"
|
||||
)
|
||||
|
||||
var DialQueueMinParallelism = 6
|
||||
@ -29,12 +29,17 @@ type dialQueue struct {
|
||||
in *queue.ChanQueue
|
||||
out *queue.ChanQueue
|
||||
|
||||
waitingCh chan chan<- peer.ID
|
||||
waitingCh chan waitingCh
|
||||
dieCh chan struct{}
|
||||
growCh chan struct{}
|
||||
shrinkCh chan struct{}
|
||||
}
|
||||
|
||||
type waitingCh struct {
|
||||
ch chan<- peer.ID
|
||||
ts time.Time
|
||||
}
|
||||
|
||||
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
|
||||
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
|
||||
// and dial producers), and takes compensating action by adjusting the worker pool.
|
||||
@ -66,7 +71,7 @@ func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialF
|
||||
|
||||
growCh: make(chan struct{}, nConsumers),
|
||||
shrinkCh: make(chan struct{}, 1),
|
||||
waitingCh: make(chan chan<- peer.ID, nConsumers),
|
||||
waitingCh: make(chan waitingCh, nConsumers),
|
||||
dieCh: make(chan struct{}, DialQueueMaxParallelism),
|
||||
}
|
||||
for i := 0; i < DialQueueMinParallelism; i++ {
|
||||
@ -78,11 +83,10 @@ func newDialQueue(ctx context.Context, target string, in *queue.ChanQueue, dialF
|
||||
|
||||
func (dq *dialQueue) control() {
|
||||
var (
|
||||
t time.Time // for logging purposes
|
||||
p peer.ID
|
||||
dialled = dq.out.DeqChan
|
||||
resp chan<- peer.ID
|
||||
waiting chan chan<- peer.ID
|
||||
resp waitingCh
|
||||
waiting <-chan waitingCh
|
||||
lastScalingEvt = time.Now()
|
||||
)
|
||||
for {
|
||||
@ -92,14 +96,13 @@ func (dq *dialQueue) control() {
|
||||
case <-dq.ctx.Done():
|
||||
return
|
||||
case p = <-dialled:
|
||||
t = time.Now()
|
||||
dialled, waiting = nil, dq.waitingCh
|
||||
continue // onto the top.
|
||||
case resp = <-waiting:
|
||||
// got a channel that's waiting for a peer.
|
||||
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(t)/time.Millisecond)
|
||||
resp <- p
|
||||
close(resp)
|
||||
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
|
||||
resp.ch <- p
|
||||
close(resp.ch)
|
||||
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
|
||||
continue // onto the top.
|
||||
default:
|
||||
@ -110,13 +113,12 @@ func (dq *dialQueue) control() {
|
||||
case <-dq.ctx.Done():
|
||||
return
|
||||
case p = <-dialled:
|
||||
t = time.Now()
|
||||
dialled, waiting = nil, dq.waitingCh
|
||||
case resp = <-waiting:
|
||||
// got a channel that's waiting for a peer.
|
||||
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(t)/time.Millisecond)
|
||||
resp <- p
|
||||
close(resp)
|
||||
log.Debugf("delivering dialled peer to DHT; took %dms.", time.Now().Sub(resp.ts)/time.Millisecond)
|
||||
resp.ch <- p
|
||||
close(resp.ch)
|
||||
dialled, waiting = dq.out.DeqChan, nil // stop consuming waiting jobs until we've cleared a peer.
|
||||
case <-dq.growCh:
|
||||
if time.Now().Sub(lastScalingEvt) < DialQueueScalingMutePeriod {
|
||||
@ -156,7 +158,7 @@ func (dq *dialQueue) Consume() (<-chan peer.ID, error) {
|
||||
|
||||
// park the channel until a dialled peer becomes available.
|
||||
select {
|
||||
case dq.waitingCh <- ch:
|
||||
case dq.waitingCh <- waitingCh{ch, time.Now()}:
|
||||
default:
|
||||
return nil, errors.New("detected more consuming goroutines than declared upfront")
|
||||
}
|
||||
@ -166,6 +168,13 @@ func (dq *dialQueue) Consume() (<-chan peer.ID, error) {
|
||||
func (dq *dialQueue) grow() {
|
||||
dq.lk.Lock()
|
||||
defer dq.lk.Unlock()
|
||||
defer func(prev int) {
|
||||
if prev == dq.nWorkers {
|
||||
return
|
||||
}
|
||||
log.Debugf("grew dial worker pool: %d => %d", prev, dq.nWorkers)
|
||||
}(dq.nWorkers)
|
||||
|
||||
if dq.nWorkers == DialQueueMaxParallelism {
|
||||
return
|
||||
}
|
||||
@ -181,6 +190,13 @@ func (dq *dialQueue) grow() {
|
||||
func (dq *dialQueue) shrink() {
|
||||
dq.lk.Lock()
|
||||
defer dq.lk.Unlock()
|
||||
defer func(prev int) {
|
||||
if prev == dq.nWorkers {
|
||||
return
|
||||
}
|
||||
log.Debugf("shrunk dial worker pool: %d => %d", prev, dq.nWorkers)
|
||||
}(dq.nWorkers)
|
||||
|
||||
if dq.nWorkers == DialQueueMinParallelism {
|
||||
return
|
||||
}
|
||||
@ -229,10 +245,12 @@ func (dq *dialQueue) worker() {
|
||||
case <-idleTimer.C:
|
||||
// no new dial requests during our idle period; time to scale down.
|
||||
case p := <-dq.in.DeqChan:
|
||||
t := time.Now()
|
||||
if err := dq.dialFn(dq.ctx, p); err != nil {
|
||||
log.Debugf("discarding dialled peer because of error: %v", err)
|
||||
continue
|
||||
}
|
||||
log.Debugf("dialling %v took %dms (as observed by the dht subsystem).", p, time.Now().Sub(t)/time.Millisecond)
|
||||
waiting := len(dq.waitingCh)
|
||||
dq.out.EnqChan <- p
|
||||
if waiting > 0 {
|
||||
@ -247,6 +265,5 @@ func (dq *dialQueue) worker() {
|
||||
case dq.shrinkCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -6,8 +6,8 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-peer"
|
||||
"github.com/libp2p/go-libp2p-peerstore/queue"
|
||||
peer "github.com/libp2p/go-libp2p-peer"
|
||||
queue "github.com/libp2p/go-libp2p-peerstore/queue"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
Loading…
x
Reference in New Issue
Block a user