RT connectivity changes

This commit is contained in:
Aarsh Shah 2020-03-06 18:27:36 +05:30 committed by Steven Allen
parent 7ada018b2a
commit fbb1b3668a
12 changed files with 276 additions and 191 deletions

104
dht.go
View File

@ -36,6 +36,7 @@ import (
) )
var logger = logging.Logger("dht") var logger = logging.Logger("dht")
var rtPvLogger = logging.Logger("dht/rt/peer-validation")
const BaseConnMgrScore = 5 const BaseConnMgrScore = 5
@ -96,6 +97,7 @@ type IpfsDHT struct {
rtRefreshQueryTimeout time.Duration rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration rtRefreshPeriod time.Duration
triggerRtRefresh chan chan<- error triggerRtRefresh chan chan<- error
triggerSelfLookup chan chan<- error
maxRecordAge time.Duration maxRecordAge time.Duration
@ -127,8 +129,11 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
if err := cfg.validate(); err != nil { if err := cfg.validate(); err != nil {
return nil, err return nil, err
} }
dht, err := makeDHT(ctx, h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}
dht := makeDHT(ctx, h, cfg)
dht.autoRefresh = cfg.routingTable.autoRefresh dht.autoRefresh = cfg.routingTable.autoRefresh
dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod dht.rtRefreshPeriod = cfg.routingTable.refreshPeriod
dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout dht.rtRefreshQueryTimeout = cfg.routingTable.refreshQueryTimeout
@ -168,6 +173,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers // handle providers
dht.proc.AddChild(dht.providers.Process()) dht.proc.AddChild(dht.providers.Process())
dht.startSelfLookup()
dht.startRefreshing() dht.startRefreshing()
return dht, nil return dht, nil
} }
@ -195,18 +201,10 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht return dht
} }
func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT { func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
self := kb.ConvertPeerID(h.ID()) rt, err := makeRoutingTable(h, cfg)
rt := kb.NewRoutingTable(cfg.bucketSize, self, cfg.routingTable.latencyTolerance, h.Peerstore()) if err != nil {
cmgr := h.ConnManager() return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
rt.PeerAdded = func(p peer.ID) {
commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
}
rt.PeerRemoved = func(p peer.ID) {
cmgr.UntagPeer(p, "kbucket")
} }
protocols := []protocol.ID{cfg.protocolPrefix + kad2} protocols := []protocol.ID{cfg.protocolPrefix + kad2}
@ -237,6 +235,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
alpha: cfg.concurrency, alpha: cfg.concurrency,
d: cfg.disjointPaths, d: cfg.disjointPaths,
triggerRtRefresh: make(chan chan<- error), triggerRtRefresh: make(chan chan<- error),
triggerSelfLookup: make(chan chan<- error),
} }
// create a DHT proc with the given context // create a DHT proc with the given context
@ -249,43 +248,39 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) *IpfsDHT {
dht.providers = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore) dht.providers = providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore)
return dht return dht, nil
} }
// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR func makeRoutingTable(h host.Host, cfg config) (*kb.RoutingTable, error) {
// come up with an alternative solution. self := kb.ConvertPeerID(h.ID())
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387 // construct the routing table with a peer validation function
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { pvF := func(c context.Context, p peer.ID) bool {
writeResp := func(errorChan chan error, err error) { if err := h.Connect(c, peer.AddrInfo{ID: p}); err != nil {
select { rtPvLogger.Errorf("failed to connect to peer %s for validation, err=%s", p, err)
case <-proc.Closing(): return false
case errorChan <- errChan:
} }
close(errorChan) return true
} }
for { rtOpts := []kb.Option{kb.PeerValidationFnc(pvF)}
select { if !(cfg.routingTable.checkInterval == 0) {
case req := <-dht.rtRecoveryChan: rtOpts = append(rtOpts, kb.TableCleanupInterval(cfg.routingTable.checkInterval))
if dht.routingTable.Size() == 0 {
logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id)
// TODO Call Seeder with default bootstrap peers here once #383 is merged
if dht.routingTable.Size() > 0 {
logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size())
go writeResp(req.errorChan, nil)
} else {
logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id)
go writeResp(req.errorChan, errors.New("RT empty after seed attempt"))
} }
} else {
logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id) rt, err := kb.NewRoutingTable(cfg.bucketSize, self, time.Minute, h.Peerstore(),
go writeResp(req.errorChan, nil) rtOpts...)
cmgr := h.ConnManager()
rt.PeerAdded = func(p peer.ID) {
commonPrefixLen := kb.CommonPrefixLen(self, kb.ConvertPeerID(p))
cmgr.TagPeer(p, "kbucket", BaseConnMgrScore+commonPrefixLen)
} }
case <-proc.Closing(): rt.PeerRemoved = func(p peer.ID) {
return cmgr.UntagPeer(p, "kbucket")
} }
}
}*/ return rt, err
}
// putValueToPeer stores the given key/value pair at the peer 'p' // putValueToPeer stores the given key/value pair at the peer 'p'
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error {
@ -398,11 +393,26 @@ func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error {
return dht.datastore.Put(mkDsKey(key), data) return dht.datastore.Put(mkDsKey(key), data)
} }
// Update signals the routingTable to Update its last-seen status // peerFound signals the routingTable that we've found a peer that
// on the given peer. // supports the DHT protocol.
func (dht *IpfsDHT) Update(ctx context.Context, p peer.ID) { func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
logger.Event(ctx, "updatePeer", p) logger.Event(ctx, "peerFound", p)
dht.routingTable.Update(p) dht.routingTable.HandlePeerAlive(p)
}
// peerStoppedDHT signals the routing table that a peer has stopped supporting the DHT protocol.
func (dht *IpfsDHT) peerStoppedDHT(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerStoppedDHT", p)
// A peer that does not support the DHT protocol is dead for us.
// There's no point in talking to anymore till it starts supporting the DHT protocol again.
dht.routingTable.HandlePeerDead(p)
}
// peerDisconnected signals the routing table that a peer is not connected anymore.
func (dht *IpfsDHT) peerDisconnected(ctx context.Context, p peer.ID) {
logger.Event(ctx, "peerDisconnected", p)
dht.routingTable.HandlePeerDisconnect(p)
} }
// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in. // FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.

View File

@ -9,6 +9,7 @@ import (
process "github.com/jbenet/goprocess" process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context" processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing" "github.com/libp2p/go-libp2p-core/routing"
kbucket "github.com/libp2p/go-libp2p-kbucket"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns" _ "github.com/multiformats/go-multiaddr-dns"
) )
@ -17,7 +18,7 @@ var DefaultBootstrapPeers []multiaddr.Multiaddr
// Minimum number of peers in the routing table. If we drop below this and we // Minimum number of peers in the routing table. If we drop below this and we
// see a new peer, we trigger a bootstrap round. // see a new peer, we trigger a bootstrap round.
var minRTRefreshThreshold = 4 var minRTRefreshThreshold = 10
func init() { func init() {
for _, s := range []string{ for _, s := range []string{
@ -35,6 +36,50 @@ func init() {
} }
} }
// startSelfLookup starts a go-routine that listens for requests to trigger a self walk on a dedicated channel
// and then sends the error status back on the error channel sent along with the request.
// if multiple callers "simultaneously" ask for a self walk, it performs ONLY one self walk and sends the same error status to all of them.
func (dht *IpfsDHT) startSelfLookup() error {
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)
for {
var waiting []chan<- error
select {
case res := <-dht.triggerSelfLookup:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}
// batch multiple refresh requests if they're all waiting at the same time.
waiting = append(waiting, collectWaitingChannels(dht.triggerSelfLookup)...)
// Do a self walk
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound || err == kbucket.ErrLookupFailure {
err = nil
} else if err != nil {
err = fmt.Errorf("failed to query self during routing table refresh: %s", err)
}
cancel()
// send back the error status
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warning(err)
}
}
})
return nil
}
// Start the refresh worker. // Start the refresh worker.
func (dht *IpfsDHT) startRefreshing() error { func (dht *IpfsDHT) startRefreshing() error {
// scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period // scan the RT table periodically & do a random walk for cpl's that haven't been queried since the given period
@ -65,17 +110,7 @@ func (dht *IpfsDHT) startRefreshing() error {
} }
// Batch multiple refresh requests if they're all waiting at the same time. // Batch multiple refresh requests if they're all waiting at the same time.
collectWaiting: waiting = append(waiting, collectWaitingChannels(dht.triggerRtRefresh)...)
for {
select {
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
default:
break collectWaiting
}
}
err := dht.doRefresh(ctx) err := dht.doRefresh(ctx)
for _, w := range waiting { for _, w := range waiting {
@ -91,11 +126,41 @@ func (dht *IpfsDHT) startRefreshing() error {
return nil return nil
} }
func collectWaitingChannels(source chan chan<- error) []chan<- error {
var waiting []chan<- error
for {
select {
case res := <-source:
if res != nil {
waiting = append(waiting, res)
}
default:
return waiting
}
}
}
func (dht *IpfsDHT) doRefresh(ctx context.Context) error { func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
var merr error var merr error
if err := dht.selfWalk(ctx); err != nil {
// wait for the self walk result
selfWalkres := make(chan error, 1)
select {
case dht.triggerSelfLookup <- selfWalkres:
case <-ctx.Done():
return ctx.Err()
}
select {
case err := <-selfWalkres:
if err != nil {
merr = multierror.Append(merr, err) merr = multierror.Append(merr, err)
} }
case <-ctx.Done():
return ctx.Err()
}
if err := dht.refreshCpls(ctx); err != nil { if err := dht.refreshCpls(ctx); err != nil {
merr = multierror.Append(merr, err) merr = multierror.Append(merr, err)
} }
@ -127,6 +192,12 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod { if time.Since(tcpl.LastRefreshAt) <= dht.rtRefreshPeriod {
continue continue
} }
// do not refresh if bucket is full
if dht.routingTable.IsBucketFull(tcpl.Cpl) {
continue
}
// gen rand peer with the cpl // gen rand peer with the cpl
randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl) randPeer, err := dht.routingTable.GenRandPeerID(tcpl.Cpl)
if err != nil { if err != nil {
@ -153,17 +224,6 @@ func (dht *IpfsDHT) refreshCpls(ctx context.Context) error {
return merr return merr
} }
// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return nil
}
return fmt.Errorf("failed to query self during routing table refresh: %s", err)
}
// Bootstrap tells the DHT to get into a bootstrapped state satisfying the // Bootstrap tells the DHT to get into a bootstrapped state satisfying the
// IpfsRouter interface. // IpfsRouter interface.
// //

49
dht_bootstrap_test.go Normal file
View File

@ -0,0 +1,49 @@
package dht
import (
"context"
"testing"
"time"
"github.com/libp2p/go-libp2p-core/event"
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/stretchr/testify/require"
)
func TestSelfWalkOnAddressChange(t *testing.T) {
ctx := context.Background()
// create three DHT instances with auto refresh disabled
d1 := setupDHT(ctx, t, false, DisableAutoRefresh())
d2 := setupDHT(ctx, t, false, DisableAutoRefresh())
d3 := setupDHT(ctx, t, false, DisableAutoRefresh())
var connectedTo *IpfsDHT
// connect d1 to whoever is "further"
if kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d2.self)) <=
kb.CommonPrefixLen(kb.ConvertPeerID(d1.self), kb.ConvertPeerID(d3.self)) {
connect(t, ctx, d1, d3)
connectedTo = d3
} else {
connect(t, ctx, d1, d2)
connectedTo = d2
}
// then connect d2 AND d3
connect(t, ctx, d2, d3)
// d1 should have ONLY 1 peer in it's RT
waitForWellFormedTables(t, []*IpfsDHT{d1}, 1, 1, 2*time.Second)
require.Equal(t, connectedTo.self, d1.routingTable.ListPeers()[0])
// now emit the address change event
em, err := d1.host.EventBus().Emitter(&event.EvtLocalAddressesUpdated{})
require.NoError(t, err)
require.NoError(t, em.Emit(event.EvtLocalAddressesUpdated{}))
waitForWellFormedTables(t, []*IpfsDHT{d1}, 2, 2, 2*time.Second)
// it should now have both peers in the RT
ps := d1.routingTable.ListPeers()
require.Contains(t, ps, d2.self)
require.Contains(t, ps, d3.self)
}

View File

@ -174,7 +174,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
ms, err := dht.messageSenderForPeer(ctx, p) ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil { if err != nil {
if err == msmux.ErrNotSupported { if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p) dht.peerStoppedDHT(ctx, p)
} }
stats.Record(ctx, stats.Record(ctx,
metrics.SentRequests.M(1), metrics.SentRequests.M(1),
@ -188,7 +188,7 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
rpmes, err := ms.SendRequest(ctx, pmes) rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil { if err != nil {
if err == msmux.ErrNotSupported { if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p) dht.peerStoppedDHT(ctx, p)
} }
stats.Record(ctx, stats.Record(ctx,
metrics.SentRequests.M(1), metrics.SentRequests.M(1),
@ -214,7 +214,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
ms, err := dht.messageSenderForPeer(ctx, p) ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil { if err != nil {
if err == msmux.ErrNotSupported { if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p) dht.peerStoppedDHT(ctx, p)
} }
stats.Record(ctx, stats.Record(ctx,
metrics.SentMessages.M(1), metrics.SentMessages.M(1),
@ -225,7 +225,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
if err := ms.SendMessage(ctx, pmes); err != nil { if err := ms.SendMessage(ctx, pmes); err != nil {
if err == msmux.ErrNotSupported { if err == msmux.ErrNotSupported {
dht.RoutingTable().Remove(p) dht.peerStoppedDHT(ctx, p)
} }
stats.Record(ctx, stats.Record(ctx,
metrics.SentMessages.M(1), metrics.SentMessages.M(1),

View File

@ -43,6 +43,7 @@ type config struct {
refreshPeriod time.Duration refreshPeriod time.Duration
autoRefresh bool autoRefresh bool
latencyTolerance time.Duration latencyTolerance time.Duration
checkInterval time.Duration
} }
// internal parameters, not publicly exposed // internal parameters, not publicly exposed
@ -80,7 +81,7 @@ var defaults = func(o *config) error {
o.routingTable.latencyTolerance = time.Minute o.routingTable.latencyTolerance = time.Minute
o.routingTable.refreshQueryTimeout = 10 * time.Second o.routingTable.refreshQueryTimeout = 10 * time.Second
o.routingTable.refreshPeriod = 1 * time.Hour o.routingTable.refreshPeriod = 10 * time.Minute
o.routingTable.autoRefresh = true o.routingTable.autoRefresh = true
o.maxRecordAge = time.Hour * 36 o.maxRecordAge = time.Hour * 36
@ -120,6 +121,14 @@ func (c *config) validate() error {
return nil return nil
} }
// RoutingTableCheckInterval is the interval between two runs of the RT cleanup routine.
func RoutingTableCheckInterval(i time.Duration) Option {
return func(c *config) error {
c.routingTable.checkInterval = i
return nil
}
}
// RoutingTableLatencyTolerance sets the maximum acceptable latency for peers // RoutingTableLatencyTolerance sets the maximum acceptable latency for peers
// in the routing table's cluster. // in the routing table's cluster.
func RoutingTableLatencyTolerance(latency time.Duration) Option { func RoutingTableLatencyTolerance(latency time.Duration) Option {

View File

@ -856,83 +856,6 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!") assert.Equal(t, dhtE.self, dhtA.routingTable.Find(dhtE.self), "A's routing table should have peer E!")
} }
// Check to make sure we re-fill the routing table from connected peers when it
// completely empties.
func TestEmptyTable(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
nDHTs := 50
dhts := setupDHTS(t, ctx, nDHTs)
defer func() {
for _, dht := range dhts {
dht.Close()
defer dht.host.Close()
}
}()
t.Logf("dhts are not connected. %d", nDHTs)
for _, dht := range dhts {
rtlen := dht.routingTable.Size()
if rtlen > 0 {
t.Errorf("routing table for %s should have 0 peers. has %d", dht.self, rtlen)
}
}
for i := 1; i < nDHTs; i++ {
connectNoSync(t, ctx, dhts[0], dhts[i])
}
// Wait till the routing table stabilizes.
oldSize := dhts[0].routingTable.Size()
for {
time.Sleep(time.Millisecond)
newSize := dhts[0].routingTable.Size()
if oldSize == newSize {
break
}
oldSize = newSize
}
// remove any one peer from the RT so we don't end up disconnecting all of them if the RT
// already has all peers we are connected to
dhts[0].routingTable.Remove(dhts[0].routingTable.ListPeers()[0])
if u.Debug {
printRoutingTables(dhts[:1])
}
// Disconnect from all peers that _were_ in the routing table.
routingTablePeers := make(map[peer.ID]bool, nDHTs)
for _, p := range dhts[0].RoutingTable().ListPeers() {
routingTablePeers[p] = true
}
oldDHTs := dhts[1:]
dhts = dhts[:1]
for _, dht := range oldDHTs {
if routingTablePeers[dht.Host().ID()] {
dhts[0].Host().Network().ClosePeer(dht.host.ID())
dht.Close()
dht.host.Close()
} else {
dhts = append(dhts, dht)
}
}
// we should now _re-add_ some peers to the routing table
for i := 0; i < 100; i++ {
if dhts[0].routingTable.Size() > 0 {
return
}
time.Sleep(time.Millisecond)
}
if u.Debug {
printRoutingTables(dhts[:1])
}
t.Fatal("routing table shouldn't have been empty")
}
func TestPeriodicRefresh(t *testing.T) { func TestPeriodicRefresh(t *testing.T) {
if ci.IsRunning() { if ci.IsRunning() {
t.Skip("skipping on CI. highly timing dependent") t.Skip("skipping on CI. highly timing dependent")

View File

@ -40,7 +40,7 @@ func TestHungRequest(t *testing.T) {
defer s.Reset() defer s.Reset()
<-ctx.Done() <-ctx.Done()
}) })
d.Update(ctx, hosts[1].ID()) d.peerFound(ctx, hosts[1].ID())
ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second) ctx1, cancel1 := context.WithTimeout(ctx, 1*time.Second)
defer cancel1() defer cancel1()
@ -214,7 +214,7 @@ func TestNotFound(t *testing.T) {
} }
for _, p := range hosts { for _, p := range hosts {
d.Update(ctx, p.ID()) d.peerFound(ctx, p.ID())
} }
// Reply with random peers to every message // Reply with random peers to every message
@ -294,7 +294,7 @@ func TestLessThanKResponses(t *testing.T) {
} }
for i := 1; i < 5; i++ { for i := 1; i < 5; i++ {
d.Update(ctx, hosts[i].ID()) d.peerFound(ctx, hosts[i].ID())
} }
// Reply with random peers to every message // Reply with random peers to every message
@ -363,7 +363,7 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
d.Update(ctx, hosts[1].ID()) d.peerFound(ctx, hosts[1].ID())
// It would be nice to be able to just get a value and succeed but then // It would be nice to be able to just get a value and succeed but then
// we'd need to deal with selectors and validators... // we'd need to deal with selectors and validators...

2
go.mod
View File

@ -14,7 +14,7 @@ require (
github.com/libp2p/go-eventbus v0.1.0 github.com/libp2p/go-eventbus v0.1.0
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a
github.com/libp2p/go-libp2p-core v0.3.2-0.20200305051524-d143201d83c2 github.com/libp2p/go-libp2p-core v0.3.2-0.20200305051524-d143201d83c2
github.com/libp2p/go-libp2p-kbucket v0.2.3 github.com/libp2p/go-libp2p-kbucket v0.3.1
github.com/libp2p/go-libp2p-peerstore v0.1.4 github.com/libp2p/go-libp2p-peerstore v0.1.4
github.com/libp2p/go-libp2p-record v0.1.2 github.com/libp2p/go-libp2p-record v0.1.2
github.com/libp2p/go-libp2p-swarm v0.2.2 github.com/libp2p/go-libp2p-swarm v0.2.2

3
go.sum
View File

@ -195,6 +195,8 @@ github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J
github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg= github.com/libp2p/go-libp2p-discovery v0.2.0/go.mod h1:s4VGaxYMbw4+4+tsoQTqh7wfxg97AEdo4GYBt6BadWg=
github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM= github.com/libp2p/go-libp2p-kbucket v0.2.3 h1:XtNfN4WUy0cfeJoJgWCf1lor4Pp3kBkFJ9vQ+Zs+VUM=
github.com/libp2p/go-libp2p-kbucket v0.2.3/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA= github.com/libp2p/go-libp2p-kbucket v0.2.3/go.mod h1:opWrBZSWnBYPc315q497huxY3sz1t488X6OiXUEYWKA=
github.com/libp2p/go-libp2p-kbucket v0.3.1 h1:aHSdqYBAyExg2xir/VN7B2myIN6yxUuDG0FeyrZpBQE=
github.com/libp2p/go-libp2p-kbucket v0.3.1/go.mod h1:oyjT5O7tS9CQurok++ERgc46YLwEpuGoFq9ubvoUOio=
github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8= github.com/libp2p/go-libp2p-loggables v0.1.0 h1:h3w8QFfCt2UJl/0/NW4K829HX/0S4KD31PQ7m8UXXO8=
github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90= github.com/libp2p/go-libp2p-loggables v0.1.0/go.mod h1:EyumB2Y6PrYjr55Q3/tiJ/o3xoDasoRYM7nOzEpoa90=
github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo= github.com/libp2p/go-libp2p-mplex v0.2.0/go.mod h1:Ejl9IyjvXJ0T9iqUTE1jpYATQ9NM3g+OtR+EMMODbKo=
@ -401,6 +403,7 @@ github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/wangjia184/sortedset v0.0.0-20160527075905-f5d03557ba30/go.mod h1:YkocrP2K2tcw938x9gCOmT5G5eCD6jsTz0SZuyAqwIE=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k=
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc=
github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo= github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc h1:9lDbC6Rz4bwmou+oE6Dt4Cb2BGMur5eR/GYptkKUVHo=

View File

@ -89,7 +89,7 @@ func BenchmarkHandleFindPeer(b *testing.B) {
panic(err) panic(err)
} }
d.routingTable.Update(id) d.peerFound(ctx, id)
peers = append(peers, id) peers = append(peers, id)
a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i)) a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 2000+i))

View File

@ -7,14 +7,16 @@ import (
"time" "time"
tu "github.com/libp2p/go-libp2p-testing/etc" tu "github.com/libp2p/go-libp2p-testing/etc"
"github.com/stretchr/testify/require"
) )
func TestNotifieeMultipleConn(t *testing.T) { func TestNotifieeMultipleConn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
d1 := setupDHT(ctx, t, false) d1 := setupDHT(ctx, t, false, RoutingTableCheckInterval(50*time.Millisecond))
d2 := setupDHT(ctx, t, false) d2 := setupDHT(ctx, t, false, RoutingTableCheckInterval(50*time.Millisecond))
nn1, err := newSubscriberNotifiee(d1) nn1, err := newSubscriberNotifiee(d1)
if err != nil { if err != nil {
@ -36,6 +38,8 @@ func TestNotifieeMultipleConn(t *testing.T) {
if !checkRoutingTable(d1, d2) { if !checkRoutingTable(d1, d2) {
t.Fatal("no routes") t.Fatal("no routes")
} }
// we are still connected, so the disconnect notification should be a No-op
nn1.Disconnected(d1.host.Network(), c12) nn1.Disconnected(d1.host.Network(), c12)
nn2.Disconnected(d2.host.Network(), c21) nn2.Disconnected(d2.host.Network(), c21)
@ -43,6 +47,8 @@ func TestNotifieeMultipleConn(t *testing.T) {
t.Fatal("no routes") t.Fatal("no routes")
} }
// the connection close should now mark the peer as missing in the RT for both peers
// because of the disconnect notification
for _, conn := range d1.host.Network().ConnsToPeer(d2.self) { for _, conn := range d1.host.Network().ConnsToPeer(d2.self) {
conn.Close() conn.Close()
} }
@ -50,20 +56,28 @@ func TestNotifieeMultipleConn(t *testing.T) {
conn.Close() conn.Close()
} }
tu.WaitFor(ctx, func() error { // close both the hosts so all connection attempts to them by RT Peer validation fail
d1.host.Close()
d2.host.Close()
// wait context will ensure that the RT cleanup completes
waitCtx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
require.NoError(t, tu.WaitFor(waitCtx, func() error {
if checkRoutingTable(d1, d2) { if checkRoutingTable(d1, d2) {
return fmt.Errorf("should not have routes") return fmt.Errorf("should not have routes")
} }
return nil return nil
}) }))
} }
func TestNotifieeFuzz(t *testing.T) { func TestNotifieeFuzz(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2) ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
defer cancel() defer cancel()
d1 := setupDHT(ctx, t, false) d1 := setupDHT(ctx, t, false, RoutingTableCheckInterval(50*time.Millisecond))
d2 := setupDHT(ctx, t, false) d2 := setupDHT(ctx, t, false, RoutingTableCheckInterval(50*time.Millisecond))
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
connectNoSync(t, ctx, d1, d2) connectNoSync(t, ctx, d1, d2)
@ -71,13 +85,16 @@ func TestNotifieeFuzz(t *testing.T) {
conn.Close() conn.Close()
} }
} }
tu.WaitFor(ctx, func() error {
// close both hosts so peer validation reconnect fails
d1.host.Close()
d2.host.Close()
require.NoError(t, tu.WaitFor(ctx, func() error {
if checkRoutingTable(d1, d2) { if checkRoutingTable(d1, d2) {
return fmt.Errorf("should not have routes") return fmt.Errorf("should not have routes")
} }
return nil return nil
}) }))
connect(t, ctx, d1, d2)
} }
func checkRoutingTable(a, b *IpfsDHT) bool { func checkRoutingTable(a, b *IpfsDHT) bool {

View File

@ -32,6 +32,10 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
// register for event bus protocol ID changes in order to update the routing table // register for event bus protocol ID changes in order to update the routing table
new(event.EvtPeerProtocolsUpdated), new(event.EvtPeerProtocolsUpdated),
// register for event bus notifications for when our local address/addresses change so we can
// advertise those to the network
new(event.EvtLocalAddressesUpdated),
} }
// register for event bus local routability changes in order to trigger switching between client and server modes // register for event bus local routability changes in order to trigger switching between client and server modes
@ -62,7 +66,7 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
return nil, fmt.Errorf("could not check peerstore for protocol support: err: %s", err) return nil, fmt.Errorf("could not check peerstore for protocol support: err: %s", err)
} }
if valid { if valid {
dht.Update(dht.ctx, p) dht.peerFound(dht.ctx, p)
} }
} }
@ -82,6 +86,16 @@ func (nn *subscriberNotifee) subscribe(proc goprocess.Process) {
} }
switch evt := e.(type) { switch evt := e.(type) {
case event.EvtLocalAddressesUpdated:
// when our address changes, we should proactively tell our closest peers about it so
// we become discoverable quickly. The Identify protocol will push a signed peer record
// with our new address to all peers we are connected to. However, we might not necessarily be connected
// to our closet peers & so in the true spirit of Zen, searching for ourself in the network really is the best way
// to to forge connections with those matter.
select {
case dht.triggerSelfLookup <- nil:
default:
}
case event.EvtPeerIdentificationCompleted: case event.EvtPeerIdentificationCompleted:
handlePeerIdentificationCompletedEvent(dht, evt) handlePeerIdentificationCompletedEvent(dht, evt)
case event.EvtPeerProtocolsUpdated: case event.EvtPeerProtocolsUpdated:
@ -117,7 +131,7 @@ func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, e event.EvtPeerIdentif
return return
} }
if valid { if valid {
dht.Update(dht.ctx, e.Peer) dht.peerFound(dht.ctx, e.Peer)
fixLowPeers(dht) fixLowPeers(dht)
} }
} }
@ -130,9 +144,9 @@ func handlePeerProtocolsUpdatedEvent(dht *IpfsDHT, e event.EvtPeerProtocolsUpdat
} }
if valid { if valid {
dht.routingTable.Update(e.Peer) dht.peerFound(dht.ctx, e.Peer)
} else { } else {
dht.routingTable.Remove(e.Peer) dht.peerStoppedDHT(dht.ctx, e.Peer)
} }
fixLowPeers(dht) fixLowPeers(dht)
@ -187,7 +201,7 @@ func fixLowPeers(dht *IpfsDHT) {
// Don't bother probing, we do that on connect. // Don't bother probing, we do that on connect.
valid, _ := dht.validRTPeer(p) valid, _ := dht.validRTPeer(p)
if valid { if valid {
dht.Update(dht.Context(), p) dht.peerFound(dht.Context(), p)
} }
} }
@ -218,7 +232,7 @@ func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {
return return
} }
dht.routingTable.Remove(p) dht.peerDisconnected(dht.ctx, p)
fixLowPeers(dht) fixLowPeers(dht)