mirror of
https://github.com/fluencelabs/go-libp2p-kad-dht
synced 2025-04-25 06:42:13 +00:00
Enable switching DHT between client and server modes (#469)
* created Mode(ModeOpt) option for choosing between auto/client/server modes * Auto mode internally switches the DHT between client and server modes based on the EvtLocalReachabilityChanged event emitted on the event bus (e.g. by AutoNAT) * routing table management of peers that switch between client and server mode while we are connected to them (i.e. are in auto mode) * removed Client(bool) option, becoming a DHT client is specified using Mode(ModeClient) instead
This commit is contained in:
parent
bf4986ec8f
commit
c24a52fc7c
100
dht.go
100
dht.go
@ -40,6 +40,13 @@ var logger = logging.Logger("dht")
|
||||
|
||||
const BaseConnMgrScore = 5
|
||||
|
||||
type mode int
|
||||
|
||||
const (
|
||||
modeServer mode = 1
|
||||
modeClient = 2
|
||||
)
|
||||
|
||||
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications.
|
||||
// It is used to implement the base Routing module.
|
||||
type IpfsDHT struct {
|
||||
@ -69,6 +76,10 @@ type IpfsDHT struct {
|
||||
|
||||
protocols []protocol.ID // DHT protocols
|
||||
|
||||
auto bool
|
||||
mode mode
|
||||
modeLk sync.Mutex
|
||||
|
||||
bucketSize int
|
||||
alpha int // The concurrency parameter per path
|
||||
d int // Number of Disjoint Paths to query
|
||||
@ -114,17 +125,37 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er
|
||||
dht.enableProviders = cfg.EnableProviders
|
||||
dht.enableValues = cfg.EnableValues
|
||||
|
||||
// register for network notifs.
|
||||
dht.proc.Go((*subscriberNotifee)(dht).subscribe)
|
||||
// handle providers
|
||||
dht.proc.AddChild(dht.providers.Process())
|
||||
dht.Validator = cfg.Validator
|
||||
|
||||
if !cfg.Client {
|
||||
for _, p := range cfg.Protocols {
|
||||
h.SetStreamHandler(p, dht.handleNewStream)
|
||||
switch cfg.Mode {
|
||||
case opts.ModeAuto:
|
||||
dht.auto = true
|
||||
dht.mode = modeClient
|
||||
case opts.ModeClient:
|
||||
dht.auto = false
|
||||
dht.mode = modeClient
|
||||
case opts.ModeServer:
|
||||
dht.auto = false
|
||||
dht.mode = modeServer
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid dht mode %d", cfg.Mode)
|
||||
}
|
||||
|
||||
if dht.mode == modeServer {
|
||||
if err := dht.moveToServerMode(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// register for event bus and network notifications
|
||||
sn, err := newSubscriberNotifiee(dht)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dht.proc.Go(sn.subscribe)
|
||||
// handle providers
|
||||
dht.proc.AddChild(dht.providers.Process())
|
||||
|
||||
dht.startRefreshing()
|
||||
return dht, nil
|
||||
}
|
||||
@ -435,6 +466,61 @@ func (dht *IpfsDHT) betterPeersToQuery(pmes *pb.Message, p peer.ID, count int) [
|
||||
return filtered
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) setMode(m mode) error {
|
||||
dht.modeLk.Lock()
|
||||
defer dht.modeLk.Unlock()
|
||||
|
||||
if m == dht.mode {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch m {
|
||||
case modeServer:
|
||||
return dht.moveToServerMode()
|
||||
case modeClient:
|
||||
return dht.moveToClientMode()
|
||||
default:
|
||||
return fmt.Errorf("unrecognized dht mode: %d", m)
|
||||
}
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) moveToServerMode() error {
|
||||
dht.mode = modeServer
|
||||
for _, p := range dht.protocols {
|
||||
dht.host.SetStreamHandler(p, dht.handleNewStream)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) moveToClientMode() error {
|
||||
dht.mode = modeClient
|
||||
for _, p := range dht.protocols {
|
||||
dht.host.RemoveStreamHandler(p)
|
||||
}
|
||||
|
||||
pset := make(map[protocol.ID]bool)
|
||||
for _, p := range dht.protocols {
|
||||
pset[p] = true
|
||||
}
|
||||
|
||||
for _, c := range dht.host.Network().Conns() {
|
||||
for _, s := range c.GetStreams() {
|
||||
if pset[s.Protocol()] {
|
||||
if s.Stat().Direction == network.DirInbound {
|
||||
s.Reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dht *IpfsDHT) getMode() mode {
|
||||
dht.modeLk.Lock()
|
||||
defer dht.modeLk.Unlock()
|
||||
return dht.mode
|
||||
}
|
||||
|
||||
// Context return dht's context
|
||||
func (dht *IpfsDHT) Context() context.Context {
|
||||
return dht.ctx
|
||||
|
18
dht_net.go
18
dht_net.go
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/libp2p/go-libp2p-kad-dht/metrics"
|
||||
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
|
||||
msmux "github.com/multiformats/go-multistream"
|
||||
|
||||
ggio "github.com/gogo/protobuf/io"
|
||||
|
||||
@ -80,6 +81,11 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
if dht.getMode() != modeServer {
|
||||
logger.Errorf("ignoring incoming dht message while not in server mode")
|
||||
return false
|
||||
}
|
||||
|
||||
var req pb.Message
|
||||
msgbytes, err := r.ReadMsg()
|
||||
msgLen := len(msgbytes)
|
||||
@ -167,6 +173,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
|
||||
|
||||
ms, err := dht.messageSenderForPeer(ctx, p)
|
||||
if err != nil {
|
||||
if err == msmux.ErrNotSupported {
|
||||
dht.RoutingTable().Remove(p)
|
||||
}
|
||||
stats.Record(ctx,
|
||||
metrics.SentRequests.M(1),
|
||||
metrics.SentRequestErrors.M(1),
|
||||
@ -178,6 +187,9 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
|
||||
|
||||
rpmes, err := ms.SendRequest(ctx, pmes)
|
||||
if err != nil {
|
||||
if err == msmux.ErrNotSupported {
|
||||
dht.RoutingTable().Remove(p)
|
||||
}
|
||||
stats.Record(ctx,
|
||||
metrics.SentRequests.M(1),
|
||||
metrics.SentRequestErrors.M(1),
|
||||
@ -201,6 +213,9 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
|
||||
|
||||
ms, err := dht.messageSenderForPeer(ctx, p)
|
||||
if err != nil {
|
||||
if err == msmux.ErrNotSupported {
|
||||
dht.RoutingTable().Remove(p)
|
||||
}
|
||||
stats.Record(ctx,
|
||||
metrics.SentMessages.M(1),
|
||||
metrics.SentMessageErrors.M(1),
|
||||
@ -209,6 +224,9 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
|
||||
}
|
||||
|
||||
if err := ms.SendMessage(ctx, pmes); err != nil {
|
||||
if err == msmux.ErrNotSupported {
|
||||
dht.RoutingTable().Remove(p)
|
||||
}
|
||||
stats.Record(ctx,
|
||||
metrics.SentMessages.M(1),
|
||||
metrics.SentMessageErrors.M(1),
|
||||
|
133
dht_test.go
133
dht_test.go
@ -13,8 +13,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/event"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
"github.com/libp2p/go-libp2p-core/peer"
|
||||
"github.com/libp2p/go-libp2p-core/peerstore"
|
||||
"github.com/libp2p/go-libp2p-core/protocol"
|
||||
"github.com/libp2p/go-libp2p-core/routing"
|
||||
"github.com/multiformats/go-multihash"
|
||||
"github.com/multiformats/go-multistream"
|
||||
@ -128,14 +131,21 @@ func (testAtomicPutValidator) Select(_ string, bs [][]byte) (int, error) {
|
||||
}
|
||||
|
||||
func setupDHT(ctx context.Context, t *testing.T, client bool, options ...opts.Option) *IpfsDHT {
|
||||
baseOpts := []opts.Option{
|
||||
opts.NamespacedValidator("v", blankValidator{}),
|
||||
opts.DisableAutoRefresh(),
|
||||
}
|
||||
|
||||
if client {
|
||||
baseOpts = append(baseOpts, opts.Mode(opts.ModeClient))
|
||||
} else {
|
||||
baseOpts = append(baseOpts, opts.Mode(opts.ModeServer))
|
||||
}
|
||||
|
||||
d, err := New(
|
||||
ctx,
|
||||
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
|
||||
append([]opts.Option{
|
||||
opts.Client(client),
|
||||
opts.NamespacedValidator("v", blankValidator{}),
|
||||
opts.DisableAutoRefresh(),
|
||||
}, options...)...,
|
||||
append(baseOpts, options...)...,
|
||||
)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
@ -791,7 +801,7 @@ func TestRefreshBelowMinRTThreshold(t *testing.T) {
|
||||
dhtA, err := New(
|
||||
ctx,
|
||||
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
|
||||
opts.Client(false),
|
||||
opts.Mode(opts.ModeServer),
|
||||
opts.NamespacedValidator("v", blankValidator{}),
|
||||
)
|
||||
if err != nil {
|
||||
@ -1604,6 +1614,44 @@ func TestProvideDisabled(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleRemotePeerProtocolChanges(t *testing.T) {
|
||||
proto := protocol.ID("/v1/dht")
|
||||
ctx := context.Background()
|
||||
os := []opts.Option{
|
||||
opts.Protocols(proto),
|
||||
opts.Mode(opts.ModeServer),
|
||||
opts.NamespacedValidator("v", blankValidator{}),
|
||||
opts.DisableAutoRefresh(),
|
||||
}
|
||||
|
||||
// start host 1 that speaks dht v1
|
||||
dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
|
||||
require.NoError(t, err)
|
||||
defer dhtA.Close()
|
||||
|
||||
// start host 2 that also speaks dht v1
|
||||
dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
|
||||
require.NoError(t, err)
|
||||
defer dhtB.Close()
|
||||
|
||||
connect(t, ctx, dhtA, dhtB)
|
||||
|
||||
// now assert both have each other in their RT
|
||||
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA, dhtB}, 1, 1, 10*time.Second), "both RT should have one peer each")
|
||||
|
||||
// dhtB becomes a client
|
||||
require.NoError(t, dhtB.setMode(modeClient))
|
||||
|
||||
// which means that dhtA should evict it from it's RT
|
||||
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 0, 0, 10*time.Second), "dHTA routing table should have 0 peers")
|
||||
|
||||
// dhtB becomes a server
|
||||
require.NoError(t, dhtB.setMode(modeServer))
|
||||
|
||||
// which means dhtA should have it in the RT again
|
||||
require.True(t, waitForWellFormedTables(t, []*IpfsDHT{dhtA}, 1, 1, 10*time.Second), "dHTA routing table should have 1 peers")
|
||||
}
|
||||
|
||||
func TestGetSetPluggedProtocol(t *testing.T) {
|
||||
t.Run("PutValue/GetValue - same protocol", func(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -1611,7 +1659,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
|
||||
|
||||
os := []opts.Option{
|
||||
opts.Protocols("/esh/dht"),
|
||||
opts.Client(false),
|
||||
opts.Mode(opts.ModeServer),
|
||||
opts.NamespacedValidator("v", blankValidator{}),
|
||||
opts.DisableAutoRefresh(),
|
||||
}
|
||||
@ -1650,7 +1698,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
|
||||
|
||||
dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
|
||||
opts.Protocols("/esh/dht"),
|
||||
opts.Client(false),
|
||||
opts.Mode(opts.ModeServer),
|
||||
opts.NamespacedValidator("v", blankValidator{}),
|
||||
opts.DisableAutoRefresh(),
|
||||
}...)
|
||||
@ -1660,7 +1708,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
|
||||
|
||||
dhtB, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), []opts.Option{
|
||||
opts.Protocols("/lsr/dht"),
|
||||
opts.Client(false),
|
||||
opts.Mode(opts.ModeServer),
|
||||
opts.NamespacedValidator("v", blankValidator{}),
|
||||
opts.DisableAutoRefresh(),
|
||||
}...)
|
||||
@ -1704,3 +1752,70 @@ func TestClientModeAtInit(t *testing.T) {
|
||||
err := pinger.Ping(context.Background(), client.PeerID())
|
||||
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
|
||||
}
|
||||
|
||||
func TestModeChange(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
clientOnly := setupDHT(ctx, t, true)
|
||||
clientToServer := setupDHT(ctx, t, true)
|
||||
clientOnly.Host().Peerstore().AddAddrs(clientToServer.PeerID(), clientToServer.Host().Addrs(), peerstore.AddressTTL)
|
||||
err := clientOnly.Ping(ctx, clientToServer.PeerID())
|
||||
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
|
||||
err = clientToServer.setMode(modeServer)
|
||||
assert.Nil(t, err)
|
||||
err = clientOnly.Ping(ctx, clientToServer.PeerID())
|
||||
assert.Nil(t, err)
|
||||
err = clientToServer.setMode(modeClient)
|
||||
assert.Nil(t, err)
|
||||
err = clientOnly.Ping(ctx, clientToServer.PeerID())
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestDynamicModeSwitching(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
prober := setupDHT(ctx, t, true) // our test harness
|
||||
node := setupDHT(ctx, t, true, opts.Mode(opts.ModeAuto)) // the node under test
|
||||
prober.Host().Peerstore().AddAddrs(node.PeerID(), node.Host().Addrs(), peerstore.AddressTTL)
|
||||
if _, err := prober.Host().Network().DialPeer(ctx, node.PeerID()); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
emitter, err := node.host.EventBus().Emitter(new(event.EvtLocalReachabilityChanged))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
assertDHTClient := func() {
|
||||
err = prober.Ping(ctx, node.PeerID())
|
||||
assert.True(t, xerrors.Is(err, multistream.ErrNotSupported))
|
||||
if l := len(prober.RoutingTable().ListPeers()); l != 0 {
|
||||
t.Errorf("expected routing table length to be 0; instead is %d", l)
|
||||
}
|
||||
}
|
||||
|
||||
assertDHTServer := func() {
|
||||
err = prober.Ping(ctx, node.PeerID())
|
||||
assert.Nil(t, err)
|
||||
if l := len(prober.RoutingTable().ListPeers()); l != 1 {
|
||||
t.Errorf("expected routing table length to be 1; instead is %d", l)
|
||||
}
|
||||
}
|
||||
|
||||
emitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPrivate})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
assertDHTClient()
|
||||
|
||||
emitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityPublic})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
assertDHTServer()
|
||||
|
||||
emitter.Emit(event.EvtLocalReachabilityChanged{Reachability: network.ReachabilityUnknown})
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
assertDHTClient()
|
||||
}
|
||||
|
@ -81,7 +81,7 @@ func TestGetFailures(t *testing.T) {
|
||||
host1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
|
||||
host2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
|
||||
|
||||
d, err := New(ctx, host1, opts.DisableAutoRefresh())
|
||||
d, err := New(ctx, host1, opts.DisableAutoRefresh(), opts.Mode(opts.ModeServer))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@ -13,7 +13,7 @@ require (
|
||||
github.com/jbenet/goprocess v0.1.3
|
||||
github.com/libp2p/go-eventbus v0.1.0
|
||||
github.com/libp2p/go-libp2p v0.5.3-0.20200221174525-7ba322244e0a
|
||||
github.com/libp2p/go-libp2p-core v0.3.1
|
||||
github.com/libp2p/go-libp2p-core v0.3.2-0.20200305051524-d143201d83c2
|
||||
github.com/libp2p/go-libp2p-kbucket v0.2.3
|
||||
github.com/libp2p/go-libp2p-peerstore v0.1.4
|
||||
github.com/libp2p/go-libp2p-record v0.1.2
|
||||
|
2
go.sum
2
go.sum
@ -187,6 +187,8 @@ github.com/libp2p/go-libp2p-core v0.3.0 h1:F7PqduvrztDtFsAa/bcheQ3azmNo+Nq7m8hQY
|
||||
github.com/libp2p/go-libp2p-core v0.3.0/go.mod h1:ACp3DmS3/N64c2jDzcV429ukDpicbL6+TrrxANBjPGw=
|
||||
github.com/libp2p/go-libp2p-core v0.3.1 h1:hEnSDjScfjYvPHoTgZhC4F62M8K1x1Oco/BY0RZ1N3s=
|
||||
github.com/libp2p/go-libp2p-core v0.3.1/go.mod h1:thvWy0hvaSBhnVBaW37BvzgVV68OUhgJJLAa6almrII=
|
||||
github.com/libp2p/go-libp2p-core v0.3.2-0.20200305051524-d143201d83c2 h1:0CAWwNiZCcDRAwoPdr3u2WTjJvHeKMw7unB557IQPH0=
|
||||
github.com/libp2p/go-libp2p-core v0.3.2-0.20200305051524-d143201d83c2/go.mod h1:49XGI+kc38oGVwqSBhDEwytaAxgZasHhFfQKibzTls0=
|
||||
github.com/libp2p/go-libp2p-crypto v0.1.0 h1:k9MFy+o2zGDNGsaoZl0MA3iZ75qXxr9OOoAZF+sD5OQ=
|
||||
github.com/libp2p/go-libp2p-crypto v0.1.0/go.mod h1:sPUokVISZiy+nNuTTH/TY+leRSxnFj/2GLjtOTW90hI=
|
||||
github.com/libp2p/go-libp2p-discovery v0.2.0 h1:1p3YSOq7VsgaL+xVHPi8XAmtGyas6D2J6rWBEfz/aiY=
|
||||
|
@ -16,8 +16,14 @@ func TestNotifieeMultipleConn(t *testing.T) {
|
||||
d1 := setupDHT(ctx, t, false)
|
||||
d2 := setupDHT(ctx, t, false)
|
||||
|
||||
nn1 := (*subscriberNotifee)(d1)
|
||||
nn2 := (*subscriberNotifee)(d2)
|
||||
nn1, err := newSubscriberNotifiee(d1)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
nn2, err := newSubscriberNotifiee(d2)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
connect(t, ctx, d1, d2)
|
||||
c12 := d1.host.Network().ConnsToPeer(d2.self)[0]
|
||||
|
@ -19,11 +19,24 @@ var (
|
||||
DefaultProtocols = []protocol.ID{ProtocolDHT}
|
||||
)
|
||||
|
||||
// ModeOpt describes what mode the dht should operate in
|
||||
type ModeOpt int
|
||||
|
||||
const (
|
||||
// ModeAuto utilizes EvtLocalReachabilityChanged events sent over the event bus to dynamically switch the DHT
|
||||
// between Client and Server modes based on network conditions
|
||||
ModeAuto ModeOpt = iota
|
||||
// ModeClient operates the DHT as a client only, it cannot respond to incoming queries
|
||||
ModeClient
|
||||
// ModeServer operates the DHT as a server, it can both send and respond to queries
|
||||
ModeServer
|
||||
)
|
||||
|
||||
// Options is a structure containing all the options that can be used when constructing a DHT.
|
||||
type Options struct {
|
||||
Datastore ds.Batching
|
||||
Validator record.Validator
|
||||
Client bool
|
||||
Mode ModeOpt
|
||||
Protocols []protocol.ID
|
||||
BucketSize int
|
||||
DisjointPaths int
|
||||
@ -122,7 +135,19 @@ func Datastore(ds ds.Batching) Option {
|
||||
// Defaults to false.
|
||||
func Client(only bool) Option {
|
||||
return func(o *Options) error {
|
||||
o.Client = only
|
||||
if only {
|
||||
o.Mode = ModeClient
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Mode configures which mode the DHT operates in (Client, Server, Auto).
|
||||
//
|
||||
// Defaults to ModeAuto.
|
||||
func Mode(m ModeOpt) Option {
|
||||
return func(o *Options) error {
|
||||
o.Mode = m
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -1,6 +1,8 @@
|
||||
package dht
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/libp2p/go-libp2p-core/event"
|
||||
"github.com/libp2p/go-libp2p-core/network"
|
||||
|
||||
@ -14,82 +16,173 @@ import (
|
||||
// subscriberNotifee implements network.Notifee and also manages the subscriber to the event bus. We consume peer
|
||||
// identification events to trigger inclusion in the routing table, and we consume Disconnected events to eject peers
|
||||
// from it.
|
||||
type subscriberNotifee IpfsDHT
|
||||
|
||||
func (nn *subscriberNotifee) DHT() *IpfsDHT {
|
||||
return (*IpfsDHT)(nn)
|
||||
type subscriberNotifee struct {
|
||||
dht *IpfsDHT
|
||||
subs event.Subscription
|
||||
}
|
||||
|
||||
func (nn *subscriberNotifee) subscribe(proc goprocess.Process) {
|
||||
dht := nn.DHT()
|
||||
func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
|
||||
bufSize := eventbus.BufSize(256)
|
||||
|
||||
dht.host.Network().Notify(nn)
|
||||
defer dht.host.Network().StopNotify(nn)
|
||||
|
||||
var err error
|
||||
evts := []interface{}{
|
||||
&event.EvtPeerIdentificationCompleted{},
|
||||
// register for event bus notifications of when peers successfully complete identification in order to update
|
||||
// the routing table
|
||||
new(event.EvtPeerIdentificationCompleted),
|
||||
|
||||
// register for event bus protocol ID changes in order to update the routing table
|
||||
new(event.EvtPeerProtocolsUpdated),
|
||||
}
|
||||
|
||||
// subscribe to the EvtPeerIdentificationCompleted event which notifies us every time a peer successfully completes identification
|
||||
sub, err := dht.host.EventBus().Subscribe(evts, eventbus.BufSize(256))
|
||||
// register for event bus local routability changes in order to trigger switching between client and server modes
|
||||
// only register for events if the DHT is operating in ModeAuto
|
||||
if dht.auto {
|
||||
evts = append(evts, new(event.EvtLocalReachabilityChanged))
|
||||
}
|
||||
|
||||
subs, err := dht.host.EventBus().Subscribe(evts, bufSize)
|
||||
if err != nil {
|
||||
logger.Errorf("dht not subscribed to peer identification events; things will fail; err: %s", err)
|
||||
return nil, fmt.Errorf("dht could not subscribe to eventbus events; err: %s", err)
|
||||
}
|
||||
defer sub.Close()
|
||||
|
||||
nn := &subscriberNotifee{
|
||||
dht: dht,
|
||||
subs: subs,
|
||||
}
|
||||
|
||||
// register for network notifications
|
||||
dht.host.Network().Notify(nn)
|
||||
|
||||
// Fill routing table with currently connected peers that are DHT servers
|
||||
dht.plk.Lock()
|
||||
defer dht.plk.Unlock()
|
||||
for _, p := range dht.host.Network().Peers() {
|
||||
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
|
||||
if err == nil && len(protos) != 0 {
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not check peerstore for protocol support: err: %s", err)
|
||||
}
|
||||
if len(protos) != 0 {
|
||||
dht.Update(dht.ctx, p)
|
||||
}
|
||||
}
|
||||
dht.plk.Unlock()
|
||||
|
||||
return nn, nil
|
||||
}
|
||||
|
||||
func (nn *subscriberNotifee) subscribe(proc goprocess.Process) {
|
||||
dht := nn.dht
|
||||
defer dht.host.Network().StopNotify(nn)
|
||||
defer nn.subs.Close()
|
||||
|
||||
for {
|
||||
select {
|
||||
case evt, more := <-sub.Out():
|
||||
// we will not be getting any more events
|
||||
case e, more := <-nn.subs.Out():
|
||||
if !more {
|
||||
return
|
||||
}
|
||||
|
||||
// something has gone really wrong if we get an event for another type
|
||||
ev, ok := evt.(event.EvtPeerIdentificationCompleted)
|
||||
if !ok {
|
||||
logger.Errorf("got wrong type from subscription: %T", ev)
|
||||
return
|
||||
switch evt := e.(type) {
|
||||
case event.EvtPeerIdentificationCompleted:
|
||||
handlePeerIdentificationCompletedEvent(dht, evt)
|
||||
case event.EvtPeerProtocolsUpdated:
|
||||
handlePeerProtocolsUpdatedEvent(dht, evt)
|
||||
case event.EvtLocalReachabilityChanged:
|
||||
if dht.auto {
|
||||
handleLocalReachabilityChangedEvent(dht, evt)
|
||||
} else {
|
||||
// something has gone really wrong if we get an event we did not subscribe to
|
||||
logger.Errorf("received LocalReachabilityChanged event that was not subscribed to")
|
||||
}
|
||||
|
||||
dht.plk.Lock()
|
||||
if dht.host.Network().Connectedness(ev.Peer) != network.Connected {
|
||||
dht.plk.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
// if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed
|
||||
protos, err := dht.peerstore.SupportsProtocols(ev.Peer, dht.protocolStrs()...)
|
||||
if err == nil && len(protos) != 0 {
|
||||
refresh := dht.routingTable.Size() <= minRTRefreshThreshold
|
||||
dht.Update(dht.ctx, ev.Peer)
|
||||
if refresh && dht.autoRefresh {
|
||||
select {
|
||||
case dht.triggerRtRefresh <- nil:
|
||||
default:
|
||||
// something has gone really wrong if we get an event for another type
|
||||
logger.Errorf("got wrong type from subscription: %T", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
dht.plk.Unlock()
|
||||
|
||||
case <-proc.Closing():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handlePeerIdentificationCompletedEvent(dht *IpfsDHT, e event.EvtPeerIdentificationCompleted) {
|
||||
dht.plk.Lock()
|
||||
defer dht.plk.Unlock()
|
||||
if dht.host.Network().Connectedness(e.Peer) != network.Connected {
|
||||
return
|
||||
}
|
||||
|
||||
// if the peer supports the DHT protocol, add it to our RT and kick a refresh if needed
|
||||
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.protocolStrs()...)
|
||||
if err != nil {
|
||||
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
|
||||
return
|
||||
}
|
||||
if len(protos) != 0 {
|
||||
dht.Update(dht.ctx, e.Peer)
|
||||
fixLowPeers(dht)
|
||||
}
|
||||
}
|
||||
|
||||
func handlePeerProtocolsUpdatedEvent(dht *IpfsDHT, e event.EvtPeerProtocolsUpdated) {
|
||||
protos, err := dht.peerstore.SupportsProtocols(e.Peer, dht.protocolStrs()...)
|
||||
if err != nil {
|
||||
logger.Errorf("could not check peerstore for protocol support: err: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(protos) > 0 {
|
||||
dht.routingTable.Update(e.Peer)
|
||||
} else {
|
||||
dht.routingTable.Remove(e.Peer)
|
||||
}
|
||||
|
||||
fixLowPeers(dht)
|
||||
}
|
||||
|
||||
func handleLocalReachabilityChangedEvent(dht *IpfsDHT, e event.EvtLocalReachabilityChanged) {
|
||||
var target mode
|
||||
|
||||
switch e.Reachability {
|
||||
case network.ReachabilityPrivate, network.ReachabilityUnknown:
|
||||
target = modeClient
|
||||
case network.ReachabilityPublic:
|
||||
target = modeServer
|
||||
}
|
||||
|
||||
logger.Infof("processed event %T; performing dht mode switch", e)
|
||||
|
||||
err := dht.setMode(target)
|
||||
// NOTE: the mode will be printed out as a decimal.
|
||||
if err == nil {
|
||||
logger.Infof("switched DHT mode successfully; new mode: %d", target)
|
||||
} else {
|
||||
logger.Warningf("switching DHT mode failed; new mode: %d, err: %s", target, err)
|
||||
}
|
||||
}
|
||||
|
||||
// fixLowPeers tries to get more peers into the routing table if we're below the threshold
|
||||
func fixLowPeers(dht *IpfsDHT) {
|
||||
if dht.routingTable.Size() > minRTRefreshThreshold {
|
||||
return
|
||||
}
|
||||
|
||||
// Passively add peers we already know about
|
||||
for _, p := range dht.host.Network().Peers() {
|
||||
// Don't bother probing, we do that on connect.
|
||||
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
|
||||
if err == nil && len(protos) != 0 {
|
||||
dht.Update(dht.Context(), p)
|
||||
}
|
||||
}
|
||||
|
||||
if dht.autoRefresh {
|
||||
select {
|
||||
case dht.triggerRtRefresh <- nil:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {
|
||||
dht := nn.DHT()
|
||||
dht := nn.dht
|
||||
select {
|
||||
case <-dht.Process().Closing():
|
||||
return
|
||||
@ -109,16 +202,7 @@ func (nn *subscriberNotifee) Disconnected(n network.Network, v network.Conn) {
|
||||
|
||||
dht.routingTable.Remove(p)
|
||||
|
||||
if dht.routingTable.Size() < minRTRefreshThreshold {
|
||||
// TODO: Actively bootstrap. For now, just try to add the currently connected peers.
|
||||
for _, p := range dht.host.Network().Peers() {
|
||||
// Don't bother probing, we do that on connect.
|
||||
protos, err := dht.peerstore.SupportsProtocols(p, dht.protocolStrs()...)
|
||||
if err == nil && len(protos) != 0 {
|
||||
dht.Update(dht.Context(), p)
|
||||
}
|
||||
}
|
||||
}
|
||||
fixLowPeers(dht)
|
||||
|
||||
dht.smlk.Lock()
|
||||
defer dht.smlk.Unlock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user