mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-24 22:32:13 +00:00
routing/dht: periodic bootstrapping #572
This commit is contained in:
parent
dc586ab20f
commit
81867969d2
63
dht.go
63
dht.go
@ -370,66 +370,3 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Bootstrap builds up list of peers by requesting random peer IDs
|
||||
func (dht *IpfsDHT) Bootstrap(ctx context.Context, queries int) error {
|
||||
var merr u.MultiErr
|
||||
|
||||
randomID := func() peer.ID {
|
||||
// 16 random bytes is not a valid peer id. it may be fine becuase
|
||||
// the dht will rehash to its own keyspace anyway.
|
||||
id := make([]byte, 16)
|
||||
rand.Read(id)
|
||||
return peer.ID(id)
|
||||
}
|
||||
|
||||
// bootstrap sequentially, as results will compound
|
||||
runQuery := func(ctx context.Context, id peer.ID) {
|
||||
p, err := dht.FindPeer(ctx, id)
|
||||
if err == routing.ErrNotFound {
|
||||
// this isn't an error. this is precisely what we expect.
|
||||
} else if err != nil {
|
||||
merr = append(merr, err)
|
||||
} else {
|
||||
// woah, actually found a peer with that ID? this shouldn't happen normally
|
||||
// (as the ID we use is not a real ID). this is an odd error worth logging.
|
||||
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
|
||||
log.Errorf("%s", err)
|
||||
merr = append(merr, err)
|
||||
}
|
||||
}
|
||||
|
||||
sequential := true
|
||||
if sequential {
|
||||
// these should be parallel normally. but can make them sequential for debugging.
|
||||
// note that the core/bootstrap context deadline should be extended too for that.
|
||||
for i := 0; i < queries; i++ {
|
||||
id := randomID()
|
||||
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
|
||||
runQuery(ctx, id)
|
||||
}
|
||||
|
||||
} else {
|
||||
// note on parallelism here: the context is passed in to the queries, so they
|
||||
// **should** exit when it exceeds, making this function exit on ctx cancel.
|
||||
// normally, we should be selecting on ctx.Done() here too, but this gets
|
||||
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < queries; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
id := randomID()
|
||||
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
|
||||
runQuery(ctx, id)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
if len(merr) > 0 {
|
||||
return merr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
181
dht_bootstrap.go
Normal file
181
dht_bootstrap.go
Normal file
@ -0,0 +1,181 @@
|
||||
// Package dht implements a distributed hash table that satisfies the ipfs routing
|
||||
// interface. This DHT is modeled after kademlia with Coral and S/Kademlia modifications.
|
||||
package dht
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
peer "github.com/jbenet/go-ipfs/p2p/peer"
|
||||
routing "github.com/jbenet/go-ipfs/routing"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
)
|
||||
|
||||
// DefaultBootstrapQueries specifies how many queries to run,
|
||||
// if the user does not specify a different number as an option.
|
||||
//
|
||||
// For now, this is set to 16 queries, which is an aggressive number.
|
||||
// We are currently more interested in ensuring we have a properly formed
|
||||
// DHT than making sure our dht minimizes traffic. Once we are more certain
|
||||
// of our implementation's robustness, we should lower this down to 8 or 4.
|
||||
//
|
||||
// Note there is also a tradeoff between the bootstrap period and the number
|
||||
// of queries. We could support a higher period with a smaller number of
|
||||
// queries
|
||||
const DefaultBootstrapQueries = 16
|
||||
|
||||
// DefaultBootstrapPeriod specifies how often to periodically run bootstrap,
|
||||
// if the user does not specify a different number as an option.
|
||||
//
|
||||
// For now, this is set to 10 seconds, which is an aggressive period. We are
|
||||
// We are currently more interested in ensuring we have a properly formed
|
||||
// DHT than making sure our dht minimizes traffic. Once we are more certain
|
||||
// implementation's robustness, we should lower this down to 30s or 1m.
|
||||
//
|
||||
// Note there is also a tradeoff between the bootstrap period and the number
|
||||
// of queries. We could support a higher period with a smaller number of
|
||||
// queries
|
||||
const DefaultBootstrapPeriod = time.Duration(10 * time.Second)
|
||||
|
||||
// Bootstrap runs bootstrapping once, then calls SignalBootstrap with default
|
||||
// parameters: DefaultBootstrapQueries and DefaultBootstrapPeriod. This allows
|
||||
// the user to catch an error off the bat if the connections are faulty. It also
|
||||
// allows BootstrapOnSignal not to run bootstrap at the beginning, which is useful
|
||||
// for instrumenting it on tests, or delaying bootstrap until the network is online
|
||||
// and connected to at least a few nodes.
|
||||
//
|
||||
// Like PeriodicBootstrap, Bootstrap returns a process, so the user can stop it.
|
||||
func (dht *IpfsDHT) Bootstrap() (goprocess.Process, error) {
|
||||
|
||||
if err := dht.runBootstrap(dht.Context(), DefaultBootstrapQueries); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sig := time.Tick(DefaultBootstrapPeriod)
|
||||
return dht.BootstrapOnSignal(DefaultBootstrapQueries, sig)
|
||||
}
|
||||
|
||||
// SignalBootstrap ensures the dht routing table remains healthy as peers come and go.
|
||||
// it builds up a list of peers by requesting random peer IDs. The Bootstrap
|
||||
// process will run a number of queries each time, and run every time signal fires.
|
||||
// These parameters are configurable.
|
||||
//
|
||||
// SignalBootstrap returns a process, so the user can stop it.
|
||||
func (dht *IpfsDHT) BootstrapOnSignal(queries int, signal <-chan time.Time) (goprocess.Process, error) {
|
||||
if queries <= 0 {
|
||||
return nil, fmt.Errorf("invalid number of queries: %d", queries)
|
||||
}
|
||||
|
||||
if signal == nil {
|
||||
return nil, fmt.Errorf("invalid signal: %v", signal)
|
||||
}
|
||||
|
||||
proc := goprocess.Go(func(worker goprocess.Process) {
|
||||
for {
|
||||
select {
|
||||
case <-worker.Closing():
|
||||
log.Debug("dht bootstrapper shutting down")
|
||||
return
|
||||
|
||||
case <-signal:
|
||||
// it would be useful to be able to send out signals of when we bootstrap, too...
|
||||
// maybe this is a good case for whole module event pub/sub?
|
||||
|
||||
ctx := dht.Context()
|
||||
if err := dht.runBootstrap(ctx, queries); err != nil {
|
||||
log.Error(err)
|
||||
// A bootstrapping error is important to notice but not fatal.
|
||||
// maybe the client should be able to consume these errors,
|
||||
// though I dont have a clear use case in mind-- what **could**
|
||||
// the client do if one of the bootstrap calls fails?
|
||||
//
|
||||
// This is also related to the core's bootstrap failures.
|
||||
// superviseConnections should perhaps allow clients to detect
|
||||
// bootstrapping problems.
|
||||
//
|
||||
// Anyway, passing errors could be done with a bootstrapper object.
|
||||
// this would imply the client should be able to consume a lot of
|
||||
// other non-fatal dht errors too. providing this functionality
|
||||
// should be done correctly DHT-wide.
|
||||
// NB: whatever the design, clients must ensure they drain errors!
|
||||
// This pattern is common to many things, perhaps long-running services
|
||||
// should have something like an ErrStream that allows clients to consume
|
||||
// periodic errors and take action. It should allow the user to also
|
||||
// ignore all errors with something like an ErrStreamDiscard. We should
|
||||
// study what other systems do for ideas.
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return proc, nil
|
||||
}
|
||||
|
||||
// runBootstrap builds up list of peers by requesting random peer IDs
|
||||
func (dht *IpfsDHT) runBootstrap(ctx context.Context, queries int) error {
|
||||
|
||||
var merr u.MultiErr
|
||||
|
||||
randomID := func() peer.ID {
|
||||
// 16 random bytes is not a valid peer id. it may be fine becuase
|
||||
// the dht will rehash to its own keyspace anyway.
|
||||
id := make([]byte, 16)
|
||||
rand.Read(id)
|
||||
return peer.ID(id)
|
||||
}
|
||||
|
||||
// bootstrap sequentially, as results will compound
|
||||
runQuery := func(ctx context.Context, id peer.ID) {
|
||||
p, err := dht.FindPeer(ctx, id)
|
||||
if err == routing.ErrNotFound {
|
||||
// this isn't an error. this is precisely what we expect.
|
||||
} else if err != nil {
|
||||
merr = append(merr, err)
|
||||
} else {
|
||||
// woah, actually found a peer with that ID? this shouldn't happen normally
|
||||
// (as the ID we use is not a real ID). this is an odd error worth logging.
|
||||
err := fmt.Errorf("Bootstrap peer error: Actually FOUND peer. (%s, %s)", id, p)
|
||||
log.Errorf("%s", err)
|
||||
merr = append(merr, err)
|
||||
}
|
||||
}
|
||||
|
||||
sequential := true
|
||||
if sequential {
|
||||
// these should be parallel normally. but can make them sequential for debugging.
|
||||
// note that the core/bootstrap context deadline should be extended too for that.
|
||||
for i := 0; i < queries; i++ {
|
||||
id := randomID()
|
||||
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
|
||||
runQuery(ctx, id)
|
||||
}
|
||||
|
||||
} else {
|
||||
// note on parallelism here: the context is passed in to the queries, so they
|
||||
// **should** exit when it exceeds, making this function exit on ctx cancel.
|
||||
// normally, we should be selecting on ctx.Done() here too, but this gets
|
||||
// complicated to do with WaitGroup, and doesnt wait for the children to exit.
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < queries; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
id := randomID()
|
||||
log.Debugf("Bootstrapping query (%d/%d) to random ID: %s", i+1, queries, id)
|
||||
runQuery(ctx, id)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
if len(merr) > 0 {
|
||||
return merr
|
||||
}
|
||||
return nil
|
||||
}
|
184
dht_test.go
184
dht_test.go
@ -75,25 +75,20 @@ func connect(t *testing.T, ctx context.Context, a, b *IpfsDHT) {
|
||||
func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
log.Error("hmm")
|
||||
defer log.Error("hmm end")
|
||||
log.Debugf("bootstrapping dhts...")
|
||||
|
||||
rounds := 1
|
||||
// tried async. sequential fares much better. compare:
|
||||
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
|
||||
// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
|
||||
// probably because results compound
|
||||
|
||||
for i := 0; i < rounds; i++ {
|
||||
log.Debugf("bootstrapping round %d/%d\n", i, rounds)
|
||||
|
||||
// tried async. sequential fares much better. compare:
|
||||
// 100 async https://gist.github.com/jbenet/56d12f0578d5f34810b2
|
||||
// 100 sync https://gist.github.com/jbenet/6c59e7c15426e48aaedd
|
||||
// probably because results compound
|
||||
|
||||
start := rand.Intn(len(dhts)) // randomize to decrease bias.
|
||||
for i := range dhts {
|
||||
dht := dhts[(start+i)%len(dhts)]
|
||||
log.Debugf("bootstrapping round %d/%d -- %s\n", i, rounds, dht.self)
|
||||
dht.Bootstrap(ctx, 3)
|
||||
}
|
||||
start := rand.Intn(len(dhts)) // randomize to decrease bias.
|
||||
for i := range dhts {
|
||||
dht := dhts[(start+i)%len(dhts)]
|
||||
dht.runBootstrap(ctx, 3)
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
||||
@ -235,6 +230,53 @@ func TestProvides(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// if minPeers or avgPeers is 0, dont test for it.
|
||||
func waitForWellFormedTables(t *testing.T, dhts []*IpfsDHT, minPeers, avgPeers int, timeout time.Duration) bool {
|
||||
// test "well-formed-ness" (>= minPeers peers in every routing table)
|
||||
|
||||
checkTables := func() bool {
|
||||
totalPeers := 0
|
||||
for _, dht := range dhts {
|
||||
rtlen := dht.routingTable.Size()
|
||||
totalPeers += rtlen
|
||||
if minPeers > 0 && rtlen < minPeers {
|
||||
t.Logf("routing table for %s only has %d peers (should have >%d)", dht.self, rtlen, minPeers)
|
||||
return false
|
||||
}
|
||||
}
|
||||
actualAvgPeers := totalPeers / len(dhts)
|
||||
t.Logf("avg rt size: %d", actualAvgPeers)
|
||||
if avgPeers > 0 && actualAvgPeers < avgPeers {
|
||||
t.Logf("avg rt size: %d < %d", actualAvgPeers, avgPeers)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
timeoutA := time.After(timeout)
|
||||
for {
|
||||
select {
|
||||
case <-timeoutA:
|
||||
log.Error("did not reach well-formed routing tables by %s", timeout)
|
||||
return false // failed
|
||||
case <-time.After(5 * time.Millisecond):
|
||||
if checkTables() {
|
||||
return true // succeeded
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func printRoutingTables(dhts []*IpfsDHT) {
|
||||
// the routing tables should be full now. let's inspect them.
|
||||
fmt.Println("checking routing table of %d", len(dhts))
|
||||
for _, dht := range dhts {
|
||||
fmt.Printf("checking routing table of %s\n", dht.self)
|
||||
dht.routingTable.Print()
|
||||
fmt.Println("")
|
||||
}
|
||||
}
|
||||
|
||||
func TestBootstrap(t *testing.T) {
|
||||
// t.Skip("skipping test to debug another")
|
||||
if testing.Short() {
|
||||
@ -258,38 +300,105 @@ func TestBootstrap(t *testing.T) {
|
||||
}
|
||||
|
||||
<-time.After(100 * time.Millisecond)
|
||||
t.Logf("bootstrapping them so they find each other", nDHTs)
|
||||
ctxT, _ := context.WithTimeout(ctx, 5*time.Second)
|
||||
bootstrap(t, ctxT, dhts)
|
||||
// bootstrap a few times until we get good tables.
|
||||
stop := make(chan struct{})
|
||||
go func() {
|
||||
for {
|
||||
t.Logf("bootstrapping them so they find each other", nDHTs)
|
||||
ctxT, _ := context.WithTimeout(ctx, 5*time.Second)
|
||||
bootstrap(t, ctxT, dhts)
|
||||
|
||||
select {
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
continue // being explicit
|
||||
case <-stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
waitForWellFormedTables(t, dhts, 7, 10, 5*time.Second)
|
||||
close(stop)
|
||||
|
||||
if u.Debug {
|
||||
// the routing tables should be full now. let's inspect them.
|
||||
<-time.After(5 * time.Second)
|
||||
t.Logf("checking routing table of %d", nDHTs)
|
||||
for _, dht := range dhts {
|
||||
fmt.Printf("checking routing table of %s\n", dht.self)
|
||||
dht.routingTable.Print()
|
||||
fmt.Println("")
|
||||
printRoutingTables(dhts)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeriodicBootstrap(t *testing.T) {
|
||||
// t.Skip("skipping test to debug another")
|
||||
if testing.Short() {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
nDHTs := 30
|
||||
_, _, dhts := setupDHTS(ctx, nDHTs, t)
|
||||
defer func() {
|
||||
for i := 0; i < nDHTs; i++ {
|
||||
dhts[i].Close()
|
||||
defer dhts[i].host.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// signal amplifier
|
||||
amplify := func(signal chan time.Time, other []chan time.Time) {
|
||||
for t := range signal {
|
||||
for _, s := range other {
|
||||
s <- t
|
||||
}
|
||||
}
|
||||
for _, s := range other {
|
||||
close(s)
|
||||
}
|
||||
}
|
||||
|
||||
// test "well-formed-ness" (>= 3 peers in every routing table)
|
||||
avgsize := 0
|
||||
signal := make(chan time.Time)
|
||||
allSignals := []chan time.Time{}
|
||||
|
||||
// kick off periodic bootstrappers with instrumented signals.
|
||||
for _, dht := range dhts {
|
||||
s := make(chan time.Time)
|
||||
allSignals = append(allSignals, s)
|
||||
dht.BootstrapOnSignal(5, s)
|
||||
}
|
||||
go amplify(signal, allSignals)
|
||||
|
||||
t.Logf("dhts are not connected.", nDHTs)
|
||||
for _, dht := range dhts {
|
||||
rtlen := dht.routingTable.Size()
|
||||
avgsize += rtlen
|
||||
t.Logf("routing table for %s has %d peers", dht.self, rtlen)
|
||||
if rtlen < 4 {
|
||||
// currently, we dont have good bootstrapping guarantees.
|
||||
// t.Errorf("routing table for %s only has %d peers", dht.self, rtlen)
|
||||
if rtlen > 0 {
|
||||
t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
|
||||
}
|
||||
}
|
||||
avgsize = avgsize / len(dhts)
|
||||
avgsizeExpected := 6
|
||||
|
||||
t.Logf("avg rt size: %d", avgsize)
|
||||
if avgsize < avgsizeExpected {
|
||||
t.Errorf("avg rt size: %d < %d", avgsize, avgsizeExpected)
|
||||
for i := 0; i < nDHTs; i++ {
|
||||
connect(t, ctx, dhts[i], dhts[(i+1)%len(dhts)])
|
||||
}
|
||||
|
||||
t.Logf("dhts are now connected to 1-2 others.", nDHTs)
|
||||
for _, dht := range dhts {
|
||||
rtlen := dht.routingTable.Size()
|
||||
if rtlen > 2 {
|
||||
t.Errorf("routing table for %s should have at most 2 peers. has %d", dht.self, rtlen)
|
||||
}
|
||||
}
|
||||
|
||||
if u.Debug {
|
||||
printRoutingTables(dhts)
|
||||
}
|
||||
|
||||
t.Logf("bootstrapping them so they find each other", nDHTs)
|
||||
signal <- time.Now()
|
||||
|
||||
// this is async, and we dont know when it's finished with one cycle, so keep checking
|
||||
// until the routing tables look better, or some long timeout for the failure case.
|
||||
waitForWellFormedTables(t, dhts, 7, 10, 5*time.Second)
|
||||
|
||||
if u.Debug {
|
||||
printRoutingTables(dhts)
|
||||
}
|
||||
}
|
||||
|
||||
@ -319,7 +428,6 @@ func TestProvidesMany(t *testing.T) {
|
||||
|
||||
if u.Debug {
|
||||
// the routing tables should be full now. let's inspect them.
|
||||
<-time.After(5 * time.Second)
|
||||
t.Logf("checking routing table of %d", nDHTs)
|
||||
for _, dht := range dhts {
|
||||
fmt.Printf("checking routing table of %s\n", dht.self)
|
||||
|
Loading…
x
Reference in New Issue
Block a user