mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-06-24 10:11:43 +00:00
core: cleaned up bootstrap process
This commit is contained in:
116
dht_bootstrap.go
116
dht_bootstrap.go
@ -17,52 +17,42 @@ import (
|
|||||||
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
|
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
|
||||||
)
|
)
|
||||||
|
|
||||||
// DefaultBootstrapQueries specifies how many queries to run,
|
// BootstrapConfig specifies parameters used bootstrapping the DHT.
|
||||||
// if the user does not specify a different number as an option.
|
|
||||||
//
|
//
|
||||||
// For now, this is set to 16 queries, which is an aggressive number.
|
// Note there is a tradeoff between the bootstrap period and the
|
||||||
// We are currently more interested in ensuring we have a properly formed
|
// number of queries. We could support a higher period with less
|
||||||
// DHT than making sure our dht minimizes traffic. Once we are more certain
|
// queries.
|
||||||
// of our implementation's robustness, we should lower this down to 8 or 4.
|
type BootstrapConfig struct {
|
||||||
|
Queries int // how many queries to run per period
|
||||||
|
Period time.Duration // how often to run periodi cbootstrap.
|
||||||
|
Timeout time.Duration // how long to wait for a bootstrao query to run
|
||||||
|
}
|
||||||
|
|
||||||
|
var DefaultBootstrapConfig = BootstrapConfig{
|
||||||
|
// For now, this is set to 1 query.
|
||||||
|
// We are currently more interested in ensuring we have a properly formed
|
||||||
|
// DHT than making sure our dht minimizes traffic. Once we are more certain
|
||||||
|
// of our implementation's robustness, we should lower this down to 8 or 4.
|
||||||
|
Queries: 1,
|
||||||
|
|
||||||
|
// For now, this is set to 10 seconds, which is an aggressive period. We are
|
||||||
|
// We are currently more interested in ensuring we have a properly formed
|
||||||
|
// DHT than making sure our dht minimizes traffic. Once we are more certain
|
||||||
|
// implementation's robustness, we should lower this down to 30s or 1m.
|
||||||
|
Period: time.Duration(20 * time.Second),
|
||||||
|
|
||||||
|
Timeout: time.Duration(20 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
|
||||||
|
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
|
||||||
|
// process will run a number of queries each time, and run every time signal fires.
|
||||||
|
// These parameters are configurable.
|
||||||
//
|
//
|
||||||
// Note there is also a tradeoff between the bootstrap period and the number
|
// Bootstrap returns a process, so the user can stop it.
|
||||||
// of queries. We could support a higher period with a smaller number of
|
func (dht *IpfsDHT) Bootstrap(config BootstrapConfig) (goprocess.Process, error) {
|
||||||
// queries
|
sig := time.Tick(config.Period)
|
||||||
const DefaultBootstrapQueries = 1
|
return dht.BootstrapOnSignal(config, sig)
|
||||||
|
|
||||||
// DefaultBootstrapPeriod specifies how often to periodically run bootstrap,
|
|
||||||
// if the user does not specify a different number as an option.
|
|
||||||
//
|
|
||||||
// For now, this is set to 10 seconds, which is an aggressive period. We are
|
|
||||||
// We are currently more interested in ensuring we have a properly formed
|
|
||||||
// DHT than making sure our dht minimizes traffic. Once we are more certain
|
|
||||||
// implementation's robustness, we should lower this down to 30s or 1m.
|
|
||||||
//
|
|
||||||
// Note there is also a tradeoff between the bootstrap period and the number
|
|
||||||
// of queries. We could support a higher period with a smaller number of
|
|
||||||
// queries
|
|
||||||
const DefaultBootstrapPeriod = time.Duration(10 * time.Second)
|
|
||||||
|
|
||||||
// DefaultBootstrapTimeout specifies how long to wait for a bootstrap query
|
|
||||||
// to run.
|
|
||||||
const DefaultBootstrapTimeout = time.Duration(10 * time.Second)
|
|
||||||
|
|
||||||
// Bootstrap runs bootstrapping once, then calls SignalBootstrap with default
|
|
||||||
// parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows
|
|
||||||
// the user to catch an error off the bat if the connections are faulty. It also
|
|
||||||
// allows BootstrapOnSignal not to run bootstrap at the beginning, which is useful
|
|
||||||
// for instrumenting it on tests, or delaying bootstrap until the network is online
|
|
||||||
// and connected to at least a few nodes.
|
|
||||||
//
|
|
||||||
// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it.
|
|
||||||
func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) {
|
|
||||||
|
|
||||||
if err := dht.runBootstrap(ctx, DefaultBootstrapQueries); err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
sig := time.Tick(DefaultBootstrapPeriod)
|
|
||||||
return dht.BootstrapOnSignal(DefaultBootstrapQueries, sig)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
|
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
|
||||||
@ -71,9 +61,9 @@ func (dht *IpfsDHT) Bootstrap(ctx context.Context) (goprocess.Process, error) {
|
|||||||
// These parameters are configurable.
|
// These parameters are configurable.
|
||||||
//
|
//
|
||||||
// SignalBootstrap returns a process, so the user can stop it.
|
// SignalBootstrap returns a process, so the user can stop it.
|
||||||
func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (goprocess.Process, error) {
|
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
|
||||||
if queries <= 0 {
|
if cfg.Queries <= 0 {
|
||||||
return nil, fmt.Errorf("invalid number of queries: %d", queries)
|
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
|
||||||
}
|
}
|
||||||
|
|
||||||
if signal == nil {
|
if signal == nil {
|
||||||
@ -85,27 +75,9 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop
|
|||||||
// maybe this is a good case for whole module event pub/sub?
|
// maybe this is a good case for whole module event pub/sub?
|
||||||
|
|
||||||
ctx := dht.Context()
|
ctx := dht.Context()
|
||||||
if err := dht.runBootstrap(ctx, queries); err != nil {
|
if err := dht.runBootstrap(ctx, cfg); err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
// A bootstrapping error is important to notice but not fatal.
|
// A bootstrapping error is important to notice but not fatal.
|
||||||
// maybe the client should be able to consume these errors,
|
|
||||||
// though I dont have a clear use case in mind-- what **could**
|
|
||||||
// the client do if one of the bootstrap calls fails?
|
|
||||||
//
|
|
||||||
// This is also related to the core's bootstrap failures.
|
|
||||||
// superviseConnections should perhaps allow clients to detect
|
|
||||||
// bootstrapping problems.
|
|
||||||
//
|
|
||||||
// Anyway, passing errors could be done with a bootstrapper object.
|
|
||||||
// this would imply the client should be able to consume a lot of
|
|
||||||
// other non-fatal dht errors too. providing this functionality
|
|
||||||
// should be done correctly DHT-wide.
|
|
||||||
// NB: whatever the design, clients must ensure they drain errors!
|
|
||||||
// This pattern is common to many things, perhaps long-running services
|
|
||||||
// should have something like an ErrStream that allows clients to consume
|
|
||||||
// periodic errors and take action. It should allow the user to also
|
|
||||||
// ignore all errors with something like an ErrStreamDiscard. We should
|
|
||||||
// study what other systems do for ideas.
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -113,7 +85,7 @@ func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (gop
|
|||||||
}
|
}
|
||||||
|
|
||||||
// runBootstrap builds up list of peers by requesting random peer IDs
|
// runBootstrap builds up list of peers by requesting random peer IDs
|
||||||
func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {
|
func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error {
|
||||||
bslog := func(msg string) {
|
bslog := func(msg string) {
|
||||||
log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size())
|
log.Debugf("DHT %s dhtRunBootstrap %s -- routing table size: %d", dht.self, msg, dht.routingTable.Size())
|
||||||
}
|
}
|
||||||
@ -133,7 +105,7 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// bootstrap sequentially, as results will compound
|
// bootstrap sequentially, as results will compound
|
||||||
ctx, cancel := context.WithTimeout(ctx, DefaultBootstrapTimeout)
|
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
runQuery := func(ctx context.Context, id peer.ID) {
|
runQuery := func(ctx context.Context, id peer.ID) {
|
||||||
p, err := dht.FindPeer(ctx, id)
|
p, err := dht.FindPeer(ctx, id)
|
||||||
@ -154,9 +126,9 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {
|
|||||||
if sequential {
|
if sequential {
|
||||||
// these should be parallel normally. but can make them sequential for debugging.
|
// these should be parallel normally. but can make them sequential for debugging.
|
||||||
// note that the core/bootstrap context deadline should be extended too for that.
|
// note that the core/bootstrap context deadline should be extended too for that.
|
||||||
for i := 0; i < queries; i++ {
|
for i := 0; i < cfg.Queries; i++ {
|
||||||
id := randomID()
|
id := randomID()
|
||||||
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
|
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
|
||||||
runQuery(ctx, id)
|
runQuery(ctx, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -166,13 +138,13 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {
|
|||||||
// normally, we should be selecting on ctx.Done() here too, but this gets
|
// normally, we should be selecting on ctx.Done() here too, but this gets
|
||||||
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
|
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
for i := 0; i < queries; i++ {
|
for i := 0; i < cfg.Queries; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
id := randomID()
|
id := randomID()
|
||||||
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
|
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
|
||||||
runQuery(ctx, id)
|
runQuery(ctx, id)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user