mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 14:52:14 +00:00
Tidy up bootstrapping (#235)
* Remove signal bootstrapping Remove IpfsDHT.BootstrapOnSignal. * Type check expected interfaces on IpfsDHT * Simplify the bootstrap logic * Tidy up a few other things * Include BootstrapOnce * Add comment about duplicating sanity checks * Use existing import naming convention * Defer error wrapping until we need it * Restore existing query count behaviour
This commit is contained in:
parent
bebd753a1d
commit
2d2bb5513c
10
dht.go
10
dht.go
@ -64,6 +64,16 @@ type IpfsDHT struct {
|
|||||||
protocols []protocol.ID // DHT protocols
|
protocols []protocol.ID // DHT protocols
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
|
||||||
|
// guarantee, but we can use them to aid refactoring.
|
||||||
|
var (
|
||||||
|
_ routing.ContentRouting = (*IpfsDHT)(nil)
|
||||||
|
_ routing.IpfsRouting = (*IpfsDHT)(nil)
|
||||||
|
_ routing.PeerRouting = (*IpfsDHT)(nil)
|
||||||
|
_ routing.PubKeyFetcher = (*IpfsDHT)(nil)
|
||||||
|
_ routing.ValueStore = (*IpfsDHT)(nil)
|
||||||
|
)
|
||||||
|
|
||||||
// New creates a new DHT with the specified host and options.
|
// New creates a new DHT with the specified host and options.
|
||||||
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
|
func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, error) {
|
||||||
var cfg opts.Options
|
var cfg opts.Options
|
||||||
|
166
dht_bootstrap.go
166
dht_bootstrap.go
@ -7,9 +7,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
u "github.com/ipfs/go-ipfs-util"
|
u "github.com/ipfs/go-ipfs-util"
|
||||||
goprocess "github.com/jbenet/goprocess"
|
|
||||||
periodicproc "github.com/jbenet/goprocess/periodic"
|
|
||||||
peer "github.com/libp2p/go-libp2p-peer"
|
peer "github.com/libp2p/go-libp2p-peer"
|
||||||
|
pstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
routing "github.com/libp2p/go-libp2p-routing"
|
routing "github.com/libp2p/go-libp2p-routing"
|
||||||
multiaddr "github.com/multiformats/go-multiaddr"
|
multiaddr "github.com/multiformats/go-multiaddr"
|
||||||
)
|
)
|
||||||
@ -66,87 +65,73 @@ var DefaultBootstrapConfig = BootstrapConfig{
|
|||||||
Timeout: time.Duration(10 * time.Second),
|
Timeout: time.Duration(10 * time.Second),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Bootstrap ensures the dht routing table remains healthy as peers come and go.
|
// A method in the IpfsRouting interface. It calls BootstrapWithConfig with
|
||||||
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
|
// the default bootstrap config.
|
||||||
// process will run a number of queries each time, and run every time signal fires.
|
|
||||||
// These parameters are configurable.
|
|
||||||
//
|
|
||||||
// As opposed to BootstrapWithConfig, Bootstrap satisfies the routing interface
|
|
||||||
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
|
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
|
||||||
proc, err := dht.BootstrapWithConfig(DefaultBootstrapConfig)
|
return dht.BootstrapWithConfig(ctx, DefaultBootstrapConfig)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait till ctx or dht.Context exits.
|
|
||||||
// we have to do it this way to satisfy the Routing interface (contexts)
|
|
||||||
go func() {
|
|
||||||
defer proc.Close()
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case <-dht.Context().Done():
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// BootstrapWithConfig ensures the dht routing table remains healthy as peers come and go.
|
// Runs cfg.Queries bootstrap queries every cfg.Period.
|
||||||
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
|
func (dht *IpfsDHT) BootstrapWithConfig(ctx context.Context, cfg BootstrapConfig) error {
|
||||||
// process will run a number of queries each time, and run every time signal fires.
|
// Because this method is not synchronous, we have to duplicate sanity
|
||||||
// These parameters are configurable.
|
// checks on the config so that callers aren't oblivious.
|
||||||
//
|
|
||||||
// BootstrapWithConfig returns a process, so the user can stop it.
|
|
||||||
func (dht *IpfsDHT) BootstrapWithConfig(cfg BootstrapConfig) (goprocess.Process, error) {
|
|
||||||
if cfg.Queries <= 0 {
|
if cfg.Queries <= 0 {
|
||||||
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
|
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
proc := dht.Process().Go(func(p goprocess.Process) {
|
|
||||||
<-p.Go(dht.bootstrapWorker(cfg)).Closed()
|
|
||||||
for {
|
for {
|
||||||
|
err := dht.runBootstrap(ctx, cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Warningf("error bootstrapping: %s", err)
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case <-time.After(cfg.Period):
|
case <-time.After(cfg.Period):
|
||||||
<-p.Go(dht.bootstrapWorker(cfg)).Closed()
|
case <-ctx.Done():
|
||||||
case <-p.Closing():
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
}()
|
||||||
|
return nil
|
||||||
return proc, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
|
// This is a synchronous bootstrap. cfg.Queries queries will run each with a
|
||||||
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
|
// timeout of cfg.Timeout. cfg.Period is not used.
|
||||||
// process will run a number of queries each time, and run every time signal fires.
|
func (dht *IpfsDHT) BootstrapOnce(ctx context.Context, cfg BootstrapConfig) error {
|
||||||
// These parameters are configurable.
|
|
||||||
//
|
|
||||||
// SignalBootstrap returns a process, so the user can stop it.
|
|
||||||
func (dht *IpfsDHT) BootstrapOnSignal(cfg BootstrapConfig, signal <-chan time.Time) (goprocess.Process, error) {
|
|
||||||
if cfg.Queries <= 0 {
|
if cfg.Queries <= 0 {
|
||||||
return nil, fmt.Errorf("invalid number of queries: %d", cfg.Queries)
|
return fmt.Errorf("invalid number of queries: %d", cfg.Queries)
|
||||||
}
|
}
|
||||||
|
return dht.runBootstrap(ctx, cfg)
|
||||||
if signal == nil {
|
|
||||||
return nil, fmt.Errorf("invalid signal: %v", signal)
|
|
||||||
}
|
|
||||||
|
|
||||||
proc := periodicproc.Ticker(signal, dht.bootstrapWorker(cfg))
|
|
||||||
|
|
||||||
return proc, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dht *IpfsDHT) bootstrapWorker(cfg BootstrapConfig) func(worker goprocess.Process) {
|
func newRandomPeerId() peer.ID {
|
||||||
return func(worker goprocess.Process) {
|
id := make([]byte, 32) // SHA256 is the default. TODO: Use a more canonical way to generate random IDs.
|
||||||
// it would be useful to be able to send out signals of when we bootstrap, too...
|
rand.Read(id)
|
||||||
// maybe this is a good case for whole module event pub/sub?
|
id = u.Hash(id) // TODO: Feed this directly into the multihash instead of hashing it.
|
||||||
|
return peer.ID(id)
|
||||||
|
}
|
||||||
|
|
||||||
ctx := dht.Context()
|
// Traverse the DHT toward the given ID.
|
||||||
if err := dht.runBootstrap(ctx, cfg); err != nil {
|
func (dht *IpfsDHT) walk(ctx context.Context, target peer.ID) (pstore.PeerInfo, error) {
|
||||||
log.Warning(err)
|
// TODO: Extract the query action (traversal logic?) inside FindPeer,
|
||||||
// A bootstrapping error is important to notice but not fatal.
|
// don't actually call through the FindPeer machinery, which can return
|
||||||
}
|
// things out of the peer store etc.
|
||||||
|
return dht.FindPeer(ctx, target)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Traverse the DHT toward a random ID.
|
||||||
|
func (dht *IpfsDHT) randomWalk(ctx context.Context) error {
|
||||||
|
id := newRandomPeerId()
|
||||||
|
p, err := dht.walk(ctx, id)
|
||||||
|
switch err {
|
||||||
|
case routing.ErrNotFound:
|
||||||
|
return nil
|
||||||
|
case nil:
|
||||||
|
// We found a peer from a randomly generated ID. This should be very
|
||||||
|
// unlikely.
|
||||||
|
log.Warningf("random walk toward %s actually found peer: %s", id, p)
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -159,51 +144,24 @@ func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
|
|||||||
defer bslog("end")
|
defer bslog("end")
|
||||||
defer log.EventBegin(ctx, "dhtRunBootstrap").Done()
|
defer log.EventBegin(ctx, "dhtRunBootstrap").Done()
|
||||||
|
|
||||||
var merr u.MultiErr
|
doQuery := func(n int, target string, f func(context.Context) error) error {
|
||||||
|
log.Debugf("Bootstrapping query (%d/%d) to %s", n, cfg.Queries, target)
|
||||||
randomID := func() peer.ID {
|
|
||||||
// 16 random bytes is not a valid peer id. it may be fine becuase
|
|
||||||
// the dht will rehash to its own keyspace anyway.
|
|
||||||
id := make([]byte, 16)
|
|
||||||
rand.Read(id)
|
|
||||||
id = u.Hash(id)
|
|
||||||
return peer.ID(id)
|
|
||||||
}
|
|
||||||
|
|
||||||
// bootstrap sequentially, as results will compound
|
|
||||||
runQuery := func(ctx context.Context, id peer.ID) {
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
|
ctx, cancel := context.WithTimeout(ctx, cfg.Timeout)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
return f(ctx)
|
||||||
p, err := dht.FindPeer(ctx, id)
|
|
||||||
if err == routing.ErrNotFound {
|
|
||||||
// this isn't an error. this is precisely what we expect.
|
|
||||||
} else if err != nil {
|
|
||||||
merr = append(merr, err)
|
|
||||||
} else {
|
|
||||||
// woah, actually found a peer with that ID? this shouldn't happen normally
|
|
||||||
// (as the ID we use is not a real ID). this is an odd error worth logging.
|
|
||||||
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
|
|
||||||
log.Warningf("%s", err)
|
|
||||||
merr = append(merr, err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// these should be parallel normally. but can make them sequential for debugging.
|
// Do all but one of the bootstrap queries as random walks.
|
||||||
// note that the core/bootstrap context deadline should be extended too for that.
|
|
||||||
for i := 0; i < cfg.Queries; i++ {
|
for i := 0; i < cfg.Queries; i++ {
|
||||||
id := randomID()
|
err := doQuery(i, "random ID", dht.randomWalk)
|
||||||
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, cfg.Queries, id)
|
if err != nil {
|
||||||
runQuery(ctx, id)
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Find self to distribute peer info to our neighbors.
|
// Find self to distribute peer info to our neighbors.
|
||||||
// Do this after bootstrapping.
|
return doQuery(cfg.Queries, fmt.Sprintf("self: %s", dht.self), func(ctx context.Context) error {
|
||||||
log.Debugf("Bootstrapping query to self: %s", dht.self)
|
_, err := dht.walk(ctx, dht.self)
|
||||||
runQuery(ctx, dht.self)
|
return err
|
||||||
|
})
|
||||||
if len(merr) > 0 {
|
|
||||||
return merr
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
18
dht_test.go
18
dht_test.go
@ -679,23 +679,10 @@ func TestPeriodicBootstrap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
signals := []chan time.Time{}
|
|
||||||
|
|
||||||
var cfg BootstrapConfig
|
var cfg BootstrapConfig
|
||||||
cfg = DefaultBootstrapConfig
|
cfg = DefaultBootstrapConfig
|
||||||
cfg.Queries = 5
|
cfg.Queries = 5
|
||||||
|
|
||||||
// kick off periodic bootstrappers with instrumented signals.
|
|
||||||
for _, dht := range dhts {
|
|
||||||
s := make(chan time.Time)
|
|
||||||
signals = append(signals, s)
|
|
||||||
proc, err := dht.BootstrapOnSignal(cfg, s)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
defer proc.Close()
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Logf("dhts are not connected. %d", nDHTs)
|
t.Logf("dhts are not connected. %d", nDHTs)
|
||||||
for _, dht := range dhts {
|
for _, dht := range dhts {
|
||||||
rtlen := dht.routingTable.Size()
|
rtlen := dht.routingTable.Size()
|
||||||
@ -721,9 +708,8 @@ func TestPeriodicBootstrap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("bootstrapping them so they find each other. %d", nDHTs)
|
t.Logf("bootstrapping them so they find each other. %d", nDHTs)
|
||||||
now := time.Now()
|
for _, dht := range dhts {
|
||||||
for _, signal := range signals {
|
go dht.BootstrapOnce(ctx, cfg)
|
||||||
go func(s chan time.Time) { s <- now }(signal)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is async, and we dont know when it's finished with one cycle, so keep checking
|
// this is async, and we dont know when it's finished with one cycle, so keep checking
|
||||||
|
Loading…
x
Reference in New Issue
Block a user