mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-07-03 06:31:38 +00:00
defer dialqueue action until initial peers have been added.
This commit is contained in:
@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
peer "github.com/libp2p/go-libp2p-peer"
|
||||||
@ -33,6 +34,7 @@ type dialQueue struct {
|
|||||||
|
|
||||||
nWorkers uint
|
nWorkers uint
|
||||||
out *queue.ChanQueue
|
out *queue.ChanQueue
|
||||||
|
started int32
|
||||||
|
|
||||||
waitingCh chan waitingCh
|
waitingCh chan waitingCh
|
||||||
dieCh chan struct{}
|
dieCh chan struct{}
|
||||||
@ -90,9 +92,10 @@ type waitingCh struct {
|
|||||||
ts time.Time
|
ts time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// newDialQueue returns an adaptive dial queue that spawns a dynamically sized set of goroutines to preemptively
|
// newDialQueue returns an _unstarted_ adaptive dial queue that spawns a dynamically sized set of goroutines to
|
||||||
// stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both ends (dial consumers
|
// preemptively stage dials for later handoff to the DHT protocol for RPC. It identifies backpressure on both
|
||||||
// and dial producers), and takes compensating action by adjusting the worker pool.
|
// ends (dial consumers and dial producers), and takes compensating action by adjusting the worker pool. To
|
||||||
|
// activate the dial queue, call Start().
|
||||||
//
|
//
|
||||||
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
|
// Why? Dialing is expensive. It's orders of magnitude slower than running an RPC on an already-established
|
||||||
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
|
// connection, as it requires establishing a TCP connection, multistream handshake, crypto handshake, mux handshake,
|
||||||
@ -112,7 +115,6 @@ type waitingCh struct {
|
|||||||
func newDialQueue(params *dqParams) (*dialQueue, error) {
|
func newDialQueue(params *dqParams) (*dialQueue, error) {
|
||||||
dq := &dialQueue{
|
dq := &dialQueue{
|
||||||
dqParams: params,
|
dqParams: params,
|
||||||
nWorkers: params.config.minParallelism,
|
|
||||||
out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
|
out: queue.NewChanQueue(params.ctx, queue.NewXORDistancePQ(params.target)),
|
||||||
growCh: make(chan struct{}, 1),
|
growCh: make(chan struct{}, 1),
|
||||||
shrinkCh: make(chan struct{}, 1),
|
shrinkCh: make(chan struct{}, 1),
|
||||||
@ -120,13 +122,22 @@ func newDialQueue(params *dqParams) (*dialQueue, error) {
|
|||||||
dieCh: make(chan struct{}, params.config.maxParallelism),
|
dieCh: make(chan struct{}, params.config.maxParallelism),
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := 0; i < int(params.config.minParallelism); i++ {
|
|
||||||
go dq.worker()
|
|
||||||
}
|
|
||||||
go dq.control()
|
go dq.control()
|
||||||
return dq, nil
|
return dq, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start initiates action on this dial queue. It should only be called once; subsequent calls are ignored.
|
||||||
|
func (dq *dialQueue) Start() {
|
||||||
|
if !atomic.CompareAndSwapInt32(&dq.started, 0, 1) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tgt := int(dq.dqParams.config.minParallelism)
|
||||||
|
for i := 0; i < tgt; i++ {
|
||||||
|
go dq.worker()
|
||||||
|
}
|
||||||
|
dq.nWorkers = uint(tgt)
|
||||||
|
}
|
||||||
|
|
||||||
func (dq *dialQueue) control() {
|
func (dq *dialQueue) control() {
|
||||||
var (
|
var (
|
||||||
dialled <-chan peer.ID
|
dialled <-chan peer.ID
|
||||||
|
@ -42,6 +42,8 @@ func TestDialQueueGrowsOnSlowDials(t *testing.T) {
|
|||||||
t.Error("unexpected error when constructing the dial queue", err)
|
t.Error("unexpected error when constructing the dial queue", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dq.Start()
|
||||||
|
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
_ = dq.Consume()
|
_ = dq.Consume()
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
@ -86,6 +88,8 @@ func TestDialQueueShrinksWithNoConsumers(t *testing.T) {
|
|||||||
t.Error("unexpected error when constructing the dial queue", err)
|
t.Error("unexpected error when constructing the dial queue", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dq.Start()
|
||||||
|
|
||||||
// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
|
// acquire 3 consumers, everytime we acquire a consumer, we will grow the pool because no dial job is completed
|
||||||
// and immediately returnable.
|
// and immediately returnable.
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
@ -158,6 +162,8 @@ func TestDialQueueShrinksWithWhenIdle(t *testing.T) {
|
|||||||
t.Error("unexpected error when constructing the dial queue", err)
|
t.Error("unexpected error when constructing the dial queue", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dq.Start()
|
||||||
|
|
||||||
// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
|
// keep up to speed with backlog by releasing the dial function every time we acquire a channel.
|
||||||
for i := 0; i < 13; i++ {
|
for i := 0; i < 13; i++ {
|
||||||
ch := dq.Consume()
|
ch := dq.Consume()
|
||||||
@ -210,6 +216,8 @@ func TestDialQueueMutePeriodHonored(t *testing.T) {
|
|||||||
t.Error("unexpected error when constructing the dial queue", err)
|
t.Error("unexpected error when constructing the dial queue", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dq.Start()
|
||||||
|
|
||||||
// pick up three consumers.
|
// pick up three consumers.
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
_ = dq.Consume()
|
_ = dq.Consume()
|
||||||
|
5
query.go
5
query.go
@ -136,6 +136,11 @@ func (r *dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (*dhtQueryRes
|
|||||||
r.addPeerToQuery(p)
|
r.addPeerToQuery(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// start the dial queue only after we've added the initial set of peers.
|
||||||
|
// this is to avoid race conditions that could cause the peersRemaining todoctr
|
||||||
|
// to be done too early if the initial dial fails before others make it into the queue.
|
||||||
|
r.peersDialed.Start()
|
||||||
|
|
||||||
// go do this thing.
|
// go do this thing.
|
||||||
// do it as a child proc to make sure Run exits
|
// do it as a child proc to make sure Run exits
|
||||||
// ONLY AFTER spawn workers has exited.
|
// ONLY AFTER spawn workers has exited.
|
||||||
|
Reference in New Issue
Block a user