go-libp2p-kad-dht/dht_bootstrap.go

185 lines
5.1 KiB
Go
Raw Normal View History

package dht
import (
2016-09-30 10:24:03 -07:00
"context"
"fmt"
"time"
multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
2019-05-26 23:33:15 +01:00
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
2019-02-27 05:21:11 +00:00
_ "github.com/multiformats/go-multiaddr-dns"
)
var DefaultBootstrapPeers []multiaddr.Multiaddr
// Minimum number of peers in the routing table. If we drop below this and we
// see a new peer, we trigger a bootstrap round.
var minRTRefreshThreshold = 4
func init() {
for _, s := range []string{
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
} {
ma, err := multiaddr.NewMultiaddr(s)
if err != nil {
panic(err)
}
DefaultBootstrapPeers = append(DefaultBootstrapPeers, ma)
}
}
// Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error {
2019-12-17 01:25:57 +08:00
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
refreshTicker := time.NewTicker(dht.rtRefreshPeriod)
defer refreshTicker.Stop()
// refresh if option is set
if dht.autoRefresh {
dht.doRefresh(ctx)
} else {
// disable the "auto-refresh" ticker so that no more ticks are sent to this channel
refreshTicker.Stop()
}
for {
var waiting []chan<- error
select {
case <-refreshTicker.C:
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}
// Batch multiple refresh requests if they're all waiting at the same time.
collectWaiting:
for {
select {
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
default:
break collectWaiting
}
}
err := dht.doRefresh(ctx)
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warning(err)
}
}
})
return nil
2019-01-23 11:18:24 +11:00
}
2016-12-08 11:58:06 +02:00
func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
var merr error
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)
}
2019-12-17 01:25:57 +08:00
if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}
2019-12-17 01:25:57 +08:00
// refreshCpls scans the routing table, and does a random walk for cpl's that haven't been queried since the given period
func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
doQuery := func(cpl uint, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing cpl %d to %s (routing table size was %d)",
cpl, target, dht.routingTable.Size())
defer func() {
2019-12-17 01:25:57 +08:00
logger.Infof("finished refreshing cpl %d to %s (routing table size is now %d)",
cpl, target, dht.routingTable.Size())
}()
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
err := f(queryCtx)
if err == context.DeadlineExceeded && queryCtx.Err() == context.DeadlineExceeded && ctx.Err() == nil {
return nil
}
return err
}
2019-12-17 01:25:57 +08:00
trackedCpls := dht.routingTable.GetTrackedCplsForRefresh()
var merr error
2019-12-17 01:25:57 +08:00
for _, tcpl := range trackedCpls {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue
}
// gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl)
if err != nil {
logger.Errorf("failed to generate peerID for cpl %d, err: %s", tcpl.Cpl, err)
2019-11-05 23:41:24 +00:00
continue
}
// walk to the generated peer
walkFnc := func(c context.Context) error {
2019-12-17 01:25:57 +08:00
_, err := dht.FindPeer(c, randPeer)
2019-11-05 23:41:24 +00:00
if err == routing.ErrNotFound {
return nil
}
2019-11-05 23:41:24 +00:00
return err
}
2019-08-18 22:21:15 +08:00
2019-12-17 01:25:57 +08:00
if err := doQuery(tcpl.Cpl, randPeer.String(), walkFnc); err != nil {
merr = multierror.Append(
merr,
2019-12-17 01:25:57 +08:00
fmt.Errorf("failed to do a random walk for cpl %d: %s", tcpl.Cpl, err),
)
}
2019-01-24 14:06:38 -08:00
}
return merr
}
// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
2019-08-18 22:21:15 +08:00
}
return fmt.Errorf("failed to query self during routing table refresh: %s", err)
2019-08-18 22:21:15 +08:00
}
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface.
//
// This just calls `RefreshRoutingTable`.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
dht.RefreshRoutingTable()
return nil
}
// RefreshRoutingTable tells the DHT to refresh it's routing tables.
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
res := make(chan error, 1)
dht.triggerRtRefresh <- res
return res
}